Merge branch 'master' into wb/issue-5908

This commit is contained in:
William Banfield
2021-08-12 12:25:12 -04:00
committed by GitHub
18 changed files with 615 additions and 266 deletions

3
.github/CODEOWNERS vendored
View File

@@ -7,5 +7,4 @@
# global owners are only requested if there isn't a more specific
# codeowner specified below. For this reason, the global codeowners
# are often repeated in package-level definitions.
* @alexanderbez @ebuchman @cmwaters @tessr @tychoish @williambanfield
* @alexanderbez @ebuchman @cmwaters @tessr @tychoish @williambanfield @creachadair

View File

@@ -24,10 +24,10 @@ This guide provides instructions for upgrading to specific versions of Tendermin
* Added `--mode` flag and `mode` config variable on `config.toml` for setting Mode of the Node: `full` | `validator` | `seed` (default: `full`)
[ADR-52](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-052-tendermint-mode.md)
* `BootstrapPeers` has been added as part of the new p2p stack. This will eventually replace
`Seeds`. Bootstrap peers are connected with on startup if needed for peer discovery. Unlike
persistent peers, there's no guarantee that the node will remain connected with these peers.
persistent peers, there's no gaurantee that the node will remain connected with these peers.
* configuration values starting with `priv-validator-` have moved to the new
`priv-validator` section, without the `priv-validator-` prefix.
@@ -35,6 +35,27 @@ This guide provides instructions for upgrading to specific versions of Tendermin
* The fast sync process as well as the blockchain package and service has all
been renamed to block sync
### Key Format Changes
The format of all tendermint on-disk database keys changes in
0.35. Upgrading nodes must either re-sync all data or run a migration
script provided in this release. The script located in
`github.com/tendermint/tendermint/scripts/keymigrate/migrate.go`
provides the function `Migrate(context.Context, db.DB)` which you can
operationalize as makes sense for your deployment.
For ease of use the `tendermint` command includes a CLI version of the
migration script, which you can invoke, as in:
tendermint key-migrate
This reads the configuration file as normal and allows the
`--db-backend` and `--db-dir` flags to change database operations as
needed.
The migration operation is idempotent and can be run more than once,
if needed.
### CLI Changes
* You must now specify the node mode (validator|full|seed) in `tendermint init [mode]`
@@ -66,7 +87,7 @@ are:
- `blockchain`
- `evidence`
Accordingly, the space `node` package was changed to reduce access to
Accordingly, the `node` package was changed to reduce access to
tendermint internals: applications that use tendermint as a library
will need to change to accommodate these changes. Most notably:
@@ -81,6 +102,16 @@ will need to change to accommodate these changes. Most notably:
Mark gRPC in the RPC layer as deprecated and to be removed in 0.36.
### Support for Custom Reactor and Mempool Implementations
The changes to p2p layer removed existing support for custom
reactors. Based on our understanding of how this functionality was
used, the introduction of the prioritized mempool covers nearly all of
the use cases for custom reactors. If you are currently running custom
reactors and mempools and are having trouble seeing the migration path
for your project please feel free to reach out to the Tendermint Core
development team directly.
## v0.34.0
**Upgrading to Tendermint 0.34 requires a blockchain restart.**
@@ -234,8 +265,8 @@ Other user-relevant changes include:
* The old `lite` package was removed; the new light client uses the `light` package.
* The `Verifier` was broken up into two pieces:
* Core verification logic (pure `VerifyX` functions)
* `Client` object, which represents the complete light client
* Core verification logic (pure `VerifyX` functions)
* `Client` object, which represents the complete light client
* The new light clients stores headers & validator sets as `LightBlock`s
* The RPC client can be found in the `/rpc` directory.
* The HTTP(S) proxy is located in the `/proxy` directory.
@@ -367,12 +398,12 @@ Evidence Params has been changed to include duration.
### Go API
* `libs/common` has been removed in favor of specific pkgs.
* `async`
* `service`
* `rand`
* `net`
* `strings`
* `cmap`
* `async`
* `service`
* `rand`
* `net`
* `strings`
* `cmap`
* removal of `errors` pkg
### RPC Changes
@@ -441,9 +472,9 @@ Prior to the update, suppose your `ResponseDeliverTx` look like:
```go
abci.ResponseDeliverTx{
Tags: []kv.Pair{
{Key: []byte("sender"), Value: []byte("foo")},
{Key: []byte("recipient"), Value: []byte("bar")},
{Key: []byte("amount"), Value: []byte("35")},
{Key: []byte("sender"), Value: []byte("foo")},
{Key: []byte("recipient"), Value: []byte("bar")},
{Key: []byte("amount"), Value: []byte("35")},
}
}
```
@@ -462,14 +493,14 @@ the following `Events`:
```go
abci.ResponseDeliverTx{
Events: []abci.Event{
{
Type: "transfer",
Attributes: kv.Pairs{
{Key: []byte("sender"), Value: []byte("foo")},
{Key: []byte("recipient"), Value: []byte("bar")},
{Key: []byte("amount"), Value: []byte("35")},
},
}
{
Type: "transfer",
Attributes: kv.Pairs{
{Key: []byte("sender"), Value: []byte("foo")},
{Key: []byte("recipient"), Value: []byte("bar")},
{Key: []byte("amount"), Value: []byte("35")},
},
}
}
```
@@ -517,9 +548,9 @@ In this case, the WS client will receive an error with description:
"jsonrpc": "2.0",
"id": "{ID}#event",
"error": {
"code": -32000,
"msg": "Server error",
"data": "subscription was canceled (reason: client is not pulling messages fast enough)" // or "subscription was canceled (reason: Tendermint exited)"
"code": -32000,
"msg": "Server error",
"data": "subscription was canceled (reason: client is not pulling messages fast enough)" // or "subscription was canceled (reason: Tendermint exited)"
}
}
@@ -725,9 +756,9 @@ just the `Data` field set:
```go
[]ProofOp{
ProofOp{
Data: <proof bytes>,
}
ProofOp{
Data: <proof bytes>,
}
}
```

View File

@@ -0,0 +1,64 @@
package commands
import (
"context"
"fmt"
"github.com/spf13/cobra"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/scripts/keymigrate"
)
func MakeKeyMigrateCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "key-migrate",
Short: "Run Database key migration",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()
contexts := []string{
// this is ordered to put the
// (presumably) biggest/most important
// subsets first.
"blockstore",
"state",
"peerstore",
"tx_index",
"evidence",
"light",
}
for idx, dbctx := range contexts {
logger.Info("beginning a key migration",
"dbctx", dbctx,
"num", idx+1,
"total", len(contexts),
)
db, err := cfg.DefaultDBProvider(&cfg.DBContext{
ID: dbctx,
Config: config,
})
if err != nil {
return fmt.Errorf("constructing database handle: %w", err)
}
if err = keymigrate.Migrate(ctx, db); err != nil {
return fmt.Errorf("running migration for context %q: %w",
dbctx, err)
}
}
logger.Info("completed database migration successfully")
return nil
},
}
// allow database info to be overridden via cli
addDBFlags(cmd)
return cmd
}

View File

@@ -83,7 +83,10 @@ func AddNodeFlags(cmd *cobra.Command) {
config.Consensus.CreateEmptyBlocksInterval.String(),
"the possible interval between empty blocks")
// db flags
addDBFlags(cmd)
}
func addDBFlags(cmd *cobra.Command) {
cmd.Flags().String(
"db-backend",
config.DBBackend,

View File

@@ -29,6 +29,7 @@ func main() {
cmd.GenNodeKeyCmd,
cmd.VersionCmd,
cmd.InspectCmd,
cmd.MakeKeyMigrateCommand(),
debug.DebugCmd,
cli.NewCompletionCmd(rootCmd, true),
)

View File

@@ -97,3 +97,4 @@ Note the context/background should be written in the present tense.
- [ADR-041: Proposer-Selection-via-ABCI](./adr-041-proposer-selection-via-abci.md)
- [ADR-045: ABCI-Evidence](./adr-045-abci-evidence.md)
- [ADR-057: RPC](./adr-057-RPC.md)
- [ADR-069: Node Initialization](./adr-069-flexible-node-initialization.md)

View File

@@ -0,0 +1,273 @@
# ADR 069: Flexible Node Initialization
## Changlog
- 2021-06-09: Initial Draft (@tychoish)
- 2021-07-21: Major Revision (@tychoish)
## Status
Proposed.
## Context
In an effort to support [Go-API-Stability](./adr-060-go-api-stability.md),
during the 0.35 development cycle, we have attempted to reduce the the API
surface area by moving most of the interface of the `node` package into
unexported functions, as well as moving the reactors to an `internal`
package. Having this coincide with the 0.35 release made a lot of sense
because these interfaces were _already_ changing as a result of the `p2p`
[refactor](./adr-061-p2p-refactor-scope.md), so it made sense to think a bit
more about how tendermint exposes this API.
While the interfaces of the P2P layer and most of the node package are already
internalized, this precludes some operational patterns that are important to
users who use tendermint as a library. Specifically, introspecting the
tendermint node service and replacing components is not supported in the latest
version of the code, and some of these use cases would require maintaining a
vendor copy of the code. Adding these features requires rather extensive
(internal/implementation) changes to the `node` and `rpc` packages, and this
ADR describes a model for changing the way that tendermint nodes initialize, in
service of providing this kind of functionality.
We consider node initialization, because the current implemention
provides strong connections between all components, as well as between
the components of the node and the RPC layer, and being able to think
about the interactions of these components will help enable these
features and help define the requirements of the node package.
## Alternative Approaches
These alternatives are presented to frame the design space and to
contextualize the decision in terms of product requirements. These
ideas are not inherently bad, and may even be possible or desireable
in the (distant) future, and merely provide additional context for how
we, in the moment came to our decision(s).
### Do Nothing
The current implementation is functional and sufficient for the vast
majority of use cases (e.g., all users of the Cosmos-SDK as well as
anyone who runs tendermint and the ABCI application in separate
processes). In the current implementation, and even previous versions,
modifying node initialization or injecting custom components required
copying most of the `node` package, which required such users
to maintain a vendored copy of tendermint.
While this is (likely) not tenable in the long term, as users do want
more modularity, and the current service implementation is brittle and
difficult to maintain, in the short term it may be possible to delay
implementation somewhat. Eventually, however, we will need to make the
`node` package easier to maintain and reason about.
### Generic Service Pluggability
One possible system design would export interfaces (in the Golang
sense) for all components of the system, to permit runtime dependency
injection of all components in the system, so that users can compose
tendermint nodes of arbitrary user-supplied components.
Although this level of customization would provide benefits, it would be a huge
undertaking (particularly with regards to API design work) that we do not have
scope for at the moment. Eventually providing support for some kinds of
pluggability may be useful, so the current solution does not explicitly
foreclose the possibility of this alternative.
### Abstract Dependency Based Startup and Shutdown
The main proposal in this document makes tendermint node initialization simpler
and more abstract, but the system lacks a number of
features which daemon/service initialization could provide, such as a
system allowing the authors of services to control initialization and shutdown order
of components using dependency relationships.
Such a system could work by allowing services to declare
initialization order dependencies to other reactors (by ID, perhaps)
so that the node could decide the initialization based on the
dependencies declared by services rather than requiring the node to
encode this logic directly.
This level of configuration is probably more complicated than is needed. Given
that the authors of components in the current implementation of tendermint
already *do* need to know about other components, a dependency-based system
would probably be overly-abstract at this stage.
## Decisions
- To the greatest extent possible, factor the code base so that
packages are responsible for their own initialization, and minimize
the amount of code in the `node` package itself.
- As a design goal, reduce direct coupling and dependencies between
components in the implementation of `node`.
- Begin iterating on a more-flexible internal framework for
initializing tendermint nodes to make the initatilization process
less hard-coded by the implementation of the node objects.
- Reactors should not need to expose their interfaces *within* the
implementation of the node type
- This refactoring should be entirely opaque to users.
- These node initialization changes should not require a
reevaluation of the `service.Service` or a generic initialization
orchestration framework.
- Do not proactively provide a system for injecting
components/services within a tendtermint node, though make it
possible to retrofit this kind of plugability in the future if
needed.
- Prioritize implementation of p2p-based statesync reactor to obviate
need for users to inject a custom state-sync provider.
## Detailed Design
The [current
nodeImpl](https://github.com/tendermint/tendermint/blob/master/node/node.go#L47)
includes direct references to the implementations of each of the
reactors, which should be replaced by references to `service.Service`
objects. This will require moving construction of the [rpc
service](https://github.com/tendermint/tendermint/blob/master/node/node.go#L771)
into the constructor of
[makeNode](https://github.com/tendermint/tendermint/blob/master/node/node.go#L126). One
possible implementation of this would be to eliminate the current
`ConfigureRPC` method on the node package and instead [configure it
here](https://github.com/tendermint/tendermint/pull/6798/files#diff-375d57e386f20eaa5f09f02bb9d28bfc48ac3dca18d0325f59492208219e5618R441).
To avoid adding complexity to the `node` package, we will add a
composite service implementation to the `service` package
that implements `service.Service` and is composed of a sequence of
underlying `service.Service` objects and handles their
startup/shutdown in the specified sequential order.
Consensus, blocksync (*née* fast sync), and statesync all depend on
each other, and have significant initialization dependencies that are
presently encoded in the `node` package. As part of this change, a
new package/component (likely named `blocks` located at
`internal/blocks`) will encapsulate the initialization of these block
management areas of the code.
### Injectable Component Option
This section briefly describes a possible implementation for
user-supplied services running within a node. This should not be
implemented unless user-supplied components are a hard requirement for
a user.
In order to allow components to be replaced, a new public function
will be added to the public interface of `node` with a signature that
resembles the following:
```go
func NewWithServices(conf *config.Config,
logger log.Logger,
cf proxy.ClientCreator,
gen *types.GenesisDoc,
srvs []service.Service,
) (service.Service, error) {
```
The `service.Service` objects will be initialized in the order supplied, after
all pre-configured/default services have started (and shut down in reverse
order). The given services may implement additional interfaces, allowing them
to replace specific default services. `NewWithServices` will validate input
service lists with the following rules:
- None of the services may already be running.
- The caller may not supply more than one replacement reactor for a given
default service type.
If callers violate any of these rules, `NewWithServices` will return
an error. To retract support for this kind of operation in the future,
the function can be modified to *always* return an error.
## Consequences
### Positive
- The node package will become easier to maintain.
- It will become easier to add additional services within tendermint
nodes.
- It will become possible to replace default components in the node
package without vendoring the tendermint repo and modifying internal
code.
- The current end-to-end (e2e) test suite will be able to prevent any
regressions, and the new functionality can be thoroughly unit tested.
- The scope of this project is very narrow, which minimizes risk.
### Negative
- This increases our reliance on the `service.Service` interface which
is probably not an interface that we want to fully commit to.
- This proposal implements a fairly minimal set of functionality and
leaves open the possibility for many additional features which are
not included in the scope of this proposal.
### Neutral
N/A
## Open Questions
- To what extent does this new initialization framework need to accommodate
the legacy p2p stack? Would it be possible to delay a great deal of this
work to the 0.36 cycle to avoid this complexity?
- Answer: _depends on timing_, and the requirement to ship pluggable reactors in 0.35.
- Where should additional public types be exported for the 0.35
release?
Related to the general project of API stabilization we want to deprecate
the `types` package, and move its contents into a new `pkg` hierarchy;
however, the design of the `pkg` interface is currently underspecified.
If `types` is going to remain for the 0.35 release, then we should consider
the impact of using multiple organizing modalities for this code within a
single release.
## Future Work
- Improve or simplify the `service.Service` interface. There are some
pretty clear limitations with this interface as written (there's no
way to timeout slow startup or shut down, the cycle between the
`service.BaseService` and `service.Service` implementations is
troubling, the default panic in `OnReset` seems troubling.)
- As part of the refactor of `service.Service` have all services/nodes
respect the lifetime of a `context.Context` object, and avoid the
current practice of creating `context.Context` objects in p2p and
reactor code. This would be required for in-process multi-tenancy.
- Support explicit dependencies between components and allow for
parallel startup, so that different reactors can startup at the same
time, where possible.
## References
- [this
branch](https://github.com/tendermint/tendermint/tree/tychoish/scratch-node-minimize)
contains experimental work in the implementation of the node package
to unwind some of the hard dependencies between components.
- [the component
graph](https://peter.bourgon.org/go-for-industrial-programming/#the-component-graph)
as a framing for internal service construction.
## Appendix
### Dependencies
There's a relationship between the blockchain and consensus reactor
described by the following dependency graph makes replacing some of
these components more difficult relative to other reactors or
components.
![consensus blockchain dependency graph](./img/consensus_blockchain.png)

Binary file not shown.

After

Width:  |  Height:  |  Size: 672 KiB

2
go.mod
View File

@@ -37,7 +37,7 @@ require (
github.com/vektra/mockery/v2 v2.9.0
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
google.golang.org/grpc v1.39.1
google.golang.org/grpc v1.40.0
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect
pgregory.net/rapid v0.4.7
)

4
go.sum
View File

@@ -1368,8 +1368,8 @@ google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.39.1 h1:f37vZbBVTiJ6jKG5mWz8ySOBxNqy6ViPgyhSdVnxF3E=
google.golang.org/grpc v1.39.1/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE=
google.golang.org/grpc v1.40.0 h1:AGJ0Ih4mHjSeibYkFGh1dD9KJ/eOtZ93I6hoHhukQ5Q=
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

View File

@@ -27,15 +27,22 @@ func (bz *HexBytes) Unmarshal(data []byte) error {
return nil
}
// MarshalJSON implements the json.Marshaler interface. The hex bytes is a
// quoted hexadecimal encoded string.
// MarshalJSON implements the json.Marshaler interface. The encoding is a JSON
// quoted string of hexadecimal digits.
func (bz HexBytes) MarshalJSON() ([]byte, error) {
s := strings.ToUpper(hex.EncodeToString(bz))
jbz := make([]byte, len(s)+2)
jbz[0] = '"'
copy(jbz[1:], s)
jbz[len(jbz)-1] = '"'
return jbz, nil
size := hex.EncodedLen(len(bz)) + 2 // +2 for quotation marks
buf := make([]byte, size)
hex.Encode(buf[1:], []byte(bz))
buf[0] = '"'
buf[size-1] = '"'
// Ensure letter digits are capitalized.
for i := 1; i < size-1; i++ {
if buf[i] >= 'a' && buf[i] <= 'f' {
buf[i] = 'A' + (buf[i] - 'a')
}
}
return buf, nil
}
// UnmarshalJSON implements the json.Umarshaler interface.

View File

@@ -37,6 +37,7 @@ func TestJSONMarshal(t *testing.T) {
{[]byte(``), `{"B1":"","B2":""}`},
{[]byte(`a`), `{"B1":"YQ==","B2":"61"}`},
{[]byte(`abc`), `{"B1":"YWJj","B2":"616263"}`},
{[]byte("\x1a\x2b\x3c"), `{"B1":"Gis8","B2":"1A2B3C"}`},
}
for i, tc := range cases {

View File

@@ -231,34 +231,45 @@ func (s *Server) Unsubscribe(ctx context.Context, args UnsubscribeArgs) error {
return err
}
var qs string
if args.Query != nil {
qs = args.Query.String()
}
s.mtx.RLock()
clientSubscriptions, ok := s.subscriptions[args.Subscriber]
if args.ID != "" {
qs, ok = clientSubscriptions[args.ID]
clientSubscriptions, err := func() (map[string]string, error) {
s.mtx.RLock()
defer s.mtx.RUnlock()
if ok && args.Query == nil {
var err error
args.Query, err = query.New(qs)
if err != nil {
return err
clientSubscriptions, ok := s.subscriptions[args.Subscriber]
if args.ID != "" {
qs, ok = clientSubscriptions[args.ID]
if ok && args.Query == nil {
var err error
args.Query, err = query.New(qs)
if err != nil {
return nil, err
}
}
} else if qs != "" {
args.ID, ok = clientSubscriptions[qs]
}
} else if qs != "" {
args.ID, ok = clientSubscriptions[qs]
}
s.mtx.RUnlock()
if !ok {
return ErrSubscriptionNotFound
if !ok {
return nil, ErrSubscriptionNotFound
}
return clientSubscriptions, nil
}()
if err != nil {
return err
}
select {
case s.cmds <- cmd{op: unsub, clientID: args.Subscriber, query: args.Query, subscription: &Subscription{id: args.ID}}:
s.mtx.Lock()
defer s.mtx.Unlock()
delete(clientSubscriptions, args.ID)
delete(clientSubscriptions, qs)
@@ -266,7 +277,6 @@ func (s *Server) Unsubscribe(ctx context.Context, args UnsubscribeArgs) error {
if len(clientSubscriptions) == 0 {
delete(s.subscriptions, args.Subscriber)
}
s.mtx.Unlock()
return nil
case <-ctx.Done():
return ctx.Err()
@@ -288,8 +298,10 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
select {
case s.cmds <- cmd{op: unsub, clientID: clientID}:
s.mtx.Lock()
defer s.mtx.Unlock()
delete(s.subscriptions, clientID)
s.mtx.Unlock()
return nil
case <-ctx.Done():
return ctx.Err()

View File

@@ -18,7 +18,6 @@ import (
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto"
cs "github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/evidence"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/pex"
@@ -37,7 +36,6 @@ import (
grpccore "github.com/tendermint/tendermint/rpc/grpc"
rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
)
@@ -71,16 +69,12 @@ type nodeImpl struct {
mempool mempool.Mempool
stateSync bool // whether the node should state sync on startup
stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots
consensusState *cs.State // latest consensus state
consensusReactor *cs.Reactor // for participating in the consensus
pexReactor *pex.Reactor // for exchanging peer addresses
pexReactorV2 *pex.ReactorV2 // for exchanging peer addresses
evidenceReactor *evidence.Reactor
evidencePool *evidence.Pool // tracking evidence
proxyApp proxy.AppConns // connection to the application
pexReactor service.Service // for exchanging peer addresses
evidenceReactor service.Service
rpcListeners []net.Listener // rpc servers
eventSinks []indexer.EventSink
indexerService *indexer.Service
indexerService service.Service
rpcEnv *rpccore.Environment
prometheusSrv *http.Server
}
@@ -371,46 +365,43 @@ func makeNode(config *cfg.Config,
// Note we currently use the addrBook regardless at least for AddOurAddress
var (
pexReactor *pex.Reactor
pexReactorV2 *pex.ReactorV2
sw *p2p.Switch
addrBook pex.AddrBook
pexReactor service.Service
sw *p2p.Switch
addrBook pex.AddrBook
)
pexCh := pex.ChannelDescriptor()
transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh})
if config.P2P.PexReactor {
if config.P2P.DisableLegacy {
addrBook = nil
pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
return nil, err
}
} else {
// setup Transport and Switch
sw = createSwitch(
config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch,
stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err)
}
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
}
addrBook, err = createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
if err != nil {
return nil, fmt.Errorf("could not create addrbook: %w", err)
}
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
if config.P2P.DisableLegacy {
addrBook = nil
pexReactor, err = createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
return nil, err
}
} else {
// setup Transport and Switch
sw = createSwitch(
config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch,
stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err)
}
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
}
addrBook, err = createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
if err != nil {
return nil, fmt.Errorf("could not create addrbook: %w", err)
}
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
}
if config.RPC.PprofListenAddress != "" {
@@ -438,19 +429,37 @@ func makeNode(config *cfg.Config,
bcReactor: bcReactor,
mempoolReactor: mpReactor,
mempool: mp,
consensusState: csState,
consensusReactor: csReactor,
stateSyncReactor: stateSyncReactor,
stateSync: stateSync,
pexReactor: pexReactor,
pexReactorV2: pexReactorV2,
evidenceReactor: evReactor,
evidencePool: evPool,
proxyApp: proxyApp,
indexerService: indexerService,
eventBus: eventBus,
eventSinks: eventSinks,
rpcEnv: &rpccore.Environment{
ProxyAppQuery: proxyApp.Query(),
ProxyAppMempool: proxyApp.Mempool(),
StateStore: stateStore,
BlockStore: blockStore,
EvidencePool: evPool,
ConsensusState: csState,
P2PPeers: sw,
BlockSyncReactor: bcReactor.(cs.BlockSyncReactor),
GenDoc: genDoc,
EventSinks: eventSinks,
ConsensusReactor: csReactor,
EventBus: eventBus,
Mempool: mp,
Logger: logger.With("module", "rpc"),
Config: *config.RPC,
},
}
node.rpcEnv.P2PTransport = node
node.BaseService = *service.NewBaseService(logger, "Node", node)
return node, nil
@@ -483,25 +492,6 @@ func makeSeedNode(config *cfg.Config,
p2pMetrics := p2p.PrometheusMetrics(config.Instrumentation.Namespace, "chain_id", genDoc.ChainID)
p2pLogger := logger.With("module", "p2p")
transport := createTransport(p2pLogger, config)
sw := createSwitch(
config, transport, p2pMetrics, nil, nil,
nil, nil, nil, nil, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err)
}
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
}
addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
if err != nil {
return nil, fmt.Errorf("could not create addrbook: %w", err)
}
peerManager, err := createPeerManager(config, dbProvider, p2pLogger, nodeKey.ID)
if err != nil {
@@ -515,8 +505,9 @@ func makeSeedNode(config *cfg.Config,
}
var (
pexReactor *pex.Reactor
pexReactorV2 *pex.ReactorV2
pexReactor service.Service
sw *p2p.Switch
addrBook pex.AddrBook
)
// add the pex reactor
@@ -526,11 +517,31 @@ func makeSeedNode(config *cfg.Config,
pexCh := pex.ChannelDescriptor()
transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh})
if config.P2P.DisableLegacy {
pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router)
pexReactor, err = createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
return nil, err
}
} else {
sw = createSwitch(
config, transport, p2pMetrics, nil, nil,
nil, nil, nil, nil, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err)
}
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
}
addrBook, err = createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
if err != nil {
return nil, fmt.Errorf("could not create addrbook: %w", err)
}
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
}
@@ -553,8 +564,7 @@ func makeSeedNode(config *cfg.Config,
peerManager: peerManager,
router: router,
pexReactor: pexReactor,
pexReactorV2: pexReactorV2,
pexReactor: pexReactor,
}
node.BaseService = *service.NewBaseService(logger, "SeedNode", node)
@@ -595,23 +605,22 @@ func (n *nodeImpl) OnStart() error {
}
n.isListening = true
n.Logger.Info("p2p service", "legacy_enabled", !n.config.P2P.DisableLegacy)
if n.config.P2P.DisableLegacy {
err = n.router.Start()
if err = n.router.Start(); err != nil {
return err
}
} else {
// Add private IDs to addrbook to block those peers being added
n.addrBook.AddPrivateIDs(strings.SplitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " "))
err = n.sw.Start()
}
if err != nil {
return err
if err = n.sw.Start(); err != nil {
return err
}
}
if n.config.Mode != cfg.ModeSeed {
if n.config.BlockSync.Version == cfg.BlockSyncV0 {
// Start the real blockchain reactor separately since the switch uses the shim.
if err := n.bcReactor.Start(); err != nil {
return err
}
@@ -638,8 +647,8 @@ func (n *nodeImpl) OnStart() error {
}
}
if n.config.P2P.DisableLegacy && n.pexReactorV2 != nil {
if err := n.pexReactorV2.Start(); err != nil {
if n.config.P2P.DisableLegacy {
if err := n.pexReactor.Start(); err != nil {
return err
}
} else {
@@ -648,7 +657,6 @@ func (n *nodeImpl) OnStart() error {
if err != nil {
return fmt.Errorf("could not dial peers from persistent-peers field: %w", err)
}
}
// Run state sync
@@ -723,10 +731,8 @@ func (n *nodeImpl) OnStop() {
}
}
if n.config.P2P.DisableLegacy && n.pexReactorV2 != nil {
if err := n.pexReactorV2.Stop(); err != nil {
n.Logger.Error("failed to stop the PEX v2 reactor", "err", err)
}
if err := n.pexReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the PEX v2 reactor", "err", err)
}
if n.config.P2P.DisableLegacy {
@@ -767,55 +773,23 @@ func (n *nodeImpl) OnStop() {
}
}
// ConfigureRPC makes sure RPC has all the objects it needs to operate.
func (n *nodeImpl) ConfigureRPC() (*rpccore.Environment, error) {
rpcCoreEnv := rpccore.Environment{
ProxyAppQuery: n.proxyApp.Query(),
ProxyAppMempool: n.proxyApp.Mempool(),
StateStore: n.stateStore,
BlockStore: n.blockStore,
EvidencePool: n.evidencePool,
ConsensusState: n.consensusState,
P2PPeers: n.sw,
P2PTransport: n,
GenDoc: n.genesisDoc,
EventSinks: n.eventSinks,
ConsensusReactor: n.consensusReactor,
EventBus: n.eventBus,
Mempool: n.mempool,
Logger: n.Logger.With("module", "rpc"),
Config: *n.config.RPC,
BlockSyncReactor: n.bcReactor.(cs.BlockSyncReactor),
}
func (n *nodeImpl) startRPC() ([]net.Listener, error) {
if n.config.Mode == cfg.ModeValidator {
pubKey, err := n.privValidator.GetPubKey(context.TODO())
if pubKey == nil || err != nil {
return nil, fmt.Errorf("can't get pubkey: %w", err)
}
rpcCoreEnv.PubKey = pubKey
n.rpcEnv.PubKey = pubKey
}
if err := rpcCoreEnv.InitGenesisChunks(); err != nil {
return nil, err
}
return &rpcCoreEnv, nil
}
func (n *nodeImpl) startRPC() ([]net.Listener, error) {
env, err := n.ConfigureRPC()
if err != nil {
if err := n.rpcEnv.InitGenesisChunks(); err != nil {
return nil, err
}
listenAddrs := strings.SplitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ")
routes := env.GetRoutes()
routes := n.rpcEnv.GetRoutes()
if n.config.RPC.Unsafe {
env.AddUnsafe(routes)
n.rpcEnv.AddUnsafe(routes)
}
config := rpcserver.DefaultConfig()
@@ -912,7 +886,7 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) {
return nil, err
}
go func() {
if err := grpccore.StartGRPCServer(env, listener); err != nil {
if err := grpccore.StartGRPCServer(n.rpcEnv, listener); err != nil {
n.Logger.Error("Error starting gRPC server", "err", err)
}
}()
@@ -945,46 +919,16 @@ func (n *nodeImpl) startPrometheusServer(addr string) *http.Server {
return srv
}
// Switch returns the Node's Switch.
func (n *nodeImpl) Switch() *p2p.Switch {
return n.sw
}
// BlockStore returns the Node's BlockStore.
func (n *nodeImpl) BlockStore() *store.BlockStore {
return n.blockStore
}
// ConsensusState returns the Node's ConsensusState.
func (n *nodeImpl) ConsensusState() *cs.State {
return n.consensusState
}
// ConsensusReactor returns the Node's ConsensusReactor.
func (n *nodeImpl) ConsensusReactor() *cs.Reactor {
return n.consensusReactor
}
// MempoolReactor returns the Node's mempool reactor.
func (n *nodeImpl) MempoolReactor() service.Service {
return n.mempoolReactor
}
// Mempool returns the Node's mempool.
func (n *nodeImpl) Mempool() mempool.Mempool {
return n.mempool
}
// PEXReactor returns the Node's PEXReactor. It returns nil if PEX is disabled.
func (n *nodeImpl) PEXReactor() *pex.Reactor {
return n.pexReactor
}
// EvidencePool returns the Node's EvidencePool.
func (n *nodeImpl) EvidencePool() *evidence.Pool {
return n.evidencePool
}
// EventBus returns the Node's EventBus.
func (n *nodeImpl) EventBus() *types.EventBus {
return n.eventBus
@@ -1001,19 +945,9 @@ func (n *nodeImpl) GenesisDoc() *types.GenesisDoc {
return n.genesisDoc
}
// ProxyApp returns the Node's AppConns, representing its connections to the ABCI application.
func (n *nodeImpl) ProxyApp() proxy.AppConns {
return n.proxyApp
}
// Config returns the Node's config.
func (n *nodeImpl) Config() *cfg.Config {
return n.config
}
// EventSinks returns the Node's event indexing sinks.
func (n *nodeImpl) EventSinks() []indexer.EventSink {
return n.eventSinks
// RPCEnvironment makes sure RPC has all the objects it needs to operate.
func (n *nodeImpl) RPCEnvironment() *rpccore.Environment {
return n.rpcEnv
}
//------------------------------------------------------------------------------

View File

@@ -513,36 +513,50 @@ func TestNodeSetEventSink(t *testing.T) {
config := cfg.ResetTestRoot("node_app_version_test")
defer os.RemoveAll(config.RootDir)
n := getTestNode(t, config, log.TestingLogger())
logger := log.TestingLogger()
setupTest := func(t *testing.T, conf *cfg.Config) []indexer.EventSink {
eventBus, err := createAndStartEventBus(logger)
require.NoError(t, err)
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.KV, n.eventSinks[0].Type())
genDoc, err := types.GenesisDocFromFile(config.GenesisFile())
require.NoError(t, err)
indexService, eventSinks, err := createAndStartIndexerService(config,
cfg.DefaultDBProvider, eventBus, logger, genDoc.ChainID)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, indexService.Stop()) })
return eventSinks
}
eventSinks := setupTest(t, config)
assert.Equal(t, 1, len(eventSinks))
assert.Equal(t, indexer.KV, eventSinks[0].Type())
config.TxIndex.Indexer = []string{"null"}
n = getTestNode(t, config, log.TestingLogger())
eventSinks = setupTest(t, config)
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.NULL, n.eventSinks[0].Type())
assert.Equal(t, 1, len(eventSinks))
assert.Equal(t, indexer.NULL, eventSinks[0].Type())
config.TxIndex.Indexer = []string{"null", "kv"}
n = getTestNode(t, config, log.TestingLogger())
eventSinks = setupTest(t, config)
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.NULL, n.eventSinks[0].Type())
assert.Equal(t, 1, len(eventSinks))
assert.Equal(t, indexer.NULL, eventSinks[0].Type())
config.TxIndex.Indexer = []string{"kvv"}
ns, err := newDefaultNode(config, log.TestingLogger())
ns, err := newDefaultNode(config, logger)
assert.Nil(t, ns)
assert.Equal(t, errors.New("unsupported event sink type"), err)
config.TxIndex.Indexer = []string{}
n = getTestNode(t, config, log.TestingLogger())
eventSinks = setupTest(t, config)
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.NULL, n.eventSinks[0].Type())
assert.Equal(t, 1, len(eventSinks))
assert.Equal(t, indexer.NULL, eventSinks[0].Type())
config.TxIndex.Indexer = []string{"psql"}
ns, err = newDefaultNode(config, log.TestingLogger())
ns, err = newDefaultNode(config, logger)
assert.Nil(t, ns)
assert.Equal(t, errors.New("the psql connection settings cannot be empty"), err)
@@ -550,46 +564,46 @@ func TestNodeSetEventSink(t *testing.T) {
config.TxIndex.Indexer = []string{"psql"}
config.TxIndex.PsqlConn = psqlConn
n = getTestNode(t, config, log.TestingLogger())
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type())
n.OnStop()
eventSinks = setupTest(t, config)
assert.Equal(t, 1, len(eventSinks))
assert.Equal(t, indexer.PSQL, eventSinks[0].Type())
config.TxIndex.Indexer = []string{"psql", "kv"}
config.TxIndex.PsqlConn = psqlConn
n = getTestNode(t, config, log.TestingLogger())
assert.Equal(t, 2, len(n.eventSinks))
eventSinks = setupTest(t, config)
assert.Equal(t, 2, len(eventSinks))
// we use map to filter the duplicated sinks, so it's not guarantee the order when append sinks.
if n.eventSinks[0].Type() == indexer.KV {
assert.Equal(t, indexer.PSQL, n.eventSinks[1].Type())
if eventSinks[0].Type() == indexer.KV {
assert.Equal(t, indexer.PSQL, eventSinks[1].Type())
} else {
assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type())
assert.Equal(t, indexer.KV, n.eventSinks[1].Type())
assert.Equal(t, indexer.PSQL, eventSinks[0].Type())
assert.Equal(t, indexer.KV, eventSinks[1].Type())
}
n.OnStop()
config.TxIndex.Indexer = []string{"kv", "psql"}
config.TxIndex.PsqlConn = psqlConn
n = getTestNode(t, config, log.TestingLogger())
assert.Equal(t, 2, len(n.eventSinks))
if n.eventSinks[0].Type() == indexer.KV {
assert.Equal(t, indexer.PSQL, n.eventSinks[1].Type())
eventSinks = setupTest(t, config)
assert.Equal(t, 2, len(eventSinks))
if eventSinks[0].Type() == indexer.KV {
assert.Equal(t, indexer.PSQL, eventSinks[1].Type())
} else {
assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type())
assert.Equal(t, indexer.KV, n.eventSinks[1].Type())
assert.Equal(t, indexer.PSQL, eventSinks[0].Type())
assert.Equal(t, indexer.KV, eventSinks[1].Type())
}
n.OnStop()
var e = errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
config.TxIndex.Indexer = []string{"psql", "kv", "Kv"}
config.TxIndex.PsqlConn = psqlConn
_, err = newDefaultNode(config, log.TestingLogger())
_, err = newDefaultNode(config, logger)
require.Error(t, err)
assert.Equal(t, e, err)
config.TxIndex.Indexer = []string{"Psql", "kV", "kv", "pSql"}
config.TxIndex.PsqlConn = psqlConn
_, err = newDefaultNode(config, log.TestingLogger())
_, err = newDefaultNode(config, logger)
require.Error(t, err)
assert.Equal(t, e, err)
}

View File

@@ -649,7 +649,7 @@ func createPEXReactorV2(
logger log.Logger,
peerManager *p2p.PeerManager,
router *p2p.Router,
) (*pex.ReactorV2, error) {
) (service.Service, error) {
channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 128)
if err != nil {

View File

@@ -2,6 +2,7 @@ package local
import (
"context"
"errors"
"fmt"
"time"
@@ -46,15 +47,15 @@ type Local struct {
// NodeService describes the portion of the node interface that the
// local RPC client constructor needs to build a local client.
type NodeService interface {
ConfigureRPC() (*rpccore.Environment, error)
RPCEnvironment() *rpccore.Environment
EventBus() *types.EventBus
}
// New configures a client that calls the Node directly.
func New(node NodeService) (*Local, error) {
env, err := node.ConfigureRPC()
if err != nil {
return nil, err
env := node.RPCEnvironment()
if env == nil {
return nil, errors.New("rpc is nil")
}
return &Local{
EventBus: node.EventBus(),

View File

@@ -36,6 +36,10 @@ func (env *Environment) NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, e
// UnsafeDialSeeds dials the given seeds (comma-separated id@IP:PORT).
func (env *Environment) UnsafeDialSeeds(ctx *rpctypes.Context, seeds []string) (*ctypes.ResultDialSeeds, error) {
if env.P2PPeers == nil {
return nil, errors.New("peer management system does not support this operation")
}
if len(seeds) == 0 {
return &ctypes.ResultDialSeeds{}, fmt.Errorf("%w: no seeds provided", ctypes.ErrInvalidRequest)
}
@@ -53,6 +57,10 @@ func (env *Environment) UnsafeDialPeers(
peers []string,
persistent, unconditional, private bool) (*ctypes.ResultDialPeers, error) {
if env.P2PPeers == nil {
return nil, errors.New("peer management system does not support this operation")
}
if len(peers) == 0 {
return &ctypes.ResultDialPeers{}, fmt.Errorf("%w: no peers provided", ctypes.ErrInvalidRequest)
}