diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 30218e098..7993419e8 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -7,5 +7,4 @@ # global owners are only requested if there isn't a more specific # codeowner specified below. For this reason, the global codeowners # are often repeated in package-level definitions. -* @alexanderbez @ebuchman @cmwaters @tessr @tychoish @williambanfield - +* @alexanderbez @ebuchman @cmwaters @tessr @tychoish @williambanfield @creachadair diff --git a/UPGRADING.md b/UPGRADING.md index ef0a29fb4..e53c34c29 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -24,10 +24,10 @@ This guide provides instructions for upgrading to specific versions of Tendermin * Added `--mode` flag and `mode` config variable on `config.toml` for setting Mode of the Node: `full` | `validator` | `seed` (default: `full`) [ADR-52](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-052-tendermint-mode.md) - + * `BootstrapPeers` has been added as part of the new p2p stack. This will eventually replace `Seeds`. Bootstrap peers are connected with on startup if needed for peer discovery. Unlike - persistent peers, there's no guarantee that the node will remain connected with these peers. + persistent peers, there's no gaurantee that the node will remain connected with these peers. * configuration values starting with `priv-validator-` have moved to the new `priv-validator` section, without the `priv-validator-` prefix. @@ -35,6 +35,27 @@ This guide provides instructions for upgrading to specific versions of Tendermin * The fast sync process as well as the blockchain package and service has all been renamed to block sync +### Key Format Changes + +The format of all tendermint on-disk database keys changes in +0.35. Upgrading nodes must either re-sync all data or run a migration +script provided in this release. The script located in +`github.com/tendermint/tendermint/scripts/keymigrate/migrate.go` +provides the function `Migrate(context.Context, db.DB)` which you can +operationalize as makes sense for your deployment. + +For ease of use the `tendermint` command includes a CLI version of the +migration script, which you can invoke, as in: + + tendermint key-migrate + +This reads the configuration file as normal and allows the +`--db-backend` and `--db-dir` flags to change database operations as +needed. + +The migration operation is idempotent and can be run more than once, +if needed. + ### CLI Changes * You must now specify the node mode (validator|full|seed) in `tendermint init [mode]` @@ -66,7 +87,7 @@ are: - `blockchain` - `evidence` -Accordingly, the space `node` package was changed to reduce access to +Accordingly, the `node` package was changed to reduce access to tendermint internals: applications that use tendermint as a library will need to change to accommodate these changes. Most notably: @@ -81,6 +102,16 @@ will need to change to accommodate these changes. Most notably: Mark gRPC in the RPC layer as deprecated and to be removed in 0.36. +### Support for Custom Reactor and Mempool Implementations + +The changes to p2p layer removed existing support for custom +reactors. Based on our understanding of how this functionality was +used, the introduction of the prioritized mempool covers nearly all of +the use cases for custom reactors. If you are currently running custom +reactors and mempools and are having trouble seeing the migration path +for your project please feel free to reach out to the Tendermint Core +development team directly. + ## v0.34.0 **Upgrading to Tendermint 0.34 requires a blockchain restart.** @@ -234,8 +265,8 @@ Other user-relevant changes include: * The old `lite` package was removed; the new light client uses the `light` package. * The `Verifier` was broken up into two pieces: - * Core verification logic (pure `VerifyX` functions) - * `Client` object, which represents the complete light client + * Core verification logic (pure `VerifyX` functions) + * `Client` object, which represents the complete light client * The new light clients stores headers & validator sets as `LightBlock`s * The RPC client can be found in the `/rpc` directory. * The HTTP(S) proxy is located in the `/proxy` directory. @@ -367,12 +398,12 @@ Evidence Params has been changed to include duration. ### Go API * `libs/common` has been removed in favor of specific pkgs. - * `async` - * `service` - * `rand` - * `net` - * `strings` - * `cmap` + * `async` + * `service` + * `rand` + * `net` + * `strings` + * `cmap` * removal of `errors` pkg ### RPC Changes @@ -441,9 +472,9 @@ Prior to the update, suppose your `ResponseDeliverTx` look like: ```go abci.ResponseDeliverTx{ Tags: []kv.Pair{ - {Key: []byte("sender"), Value: []byte("foo")}, - {Key: []byte("recipient"), Value: []byte("bar")}, - {Key: []byte("amount"), Value: []byte("35")}, + {Key: []byte("sender"), Value: []byte("foo")}, + {Key: []byte("recipient"), Value: []byte("bar")}, + {Key: []byte("amount"), Value: []byte("35")}, } } ``` @@ -462,14 +493,14 @@ the following `Events`: ```go abci.ResponseDeliverTx{ Events: []abci.Event{ - { - Type: "transfer", - Attributes: kv.Pairs{ - {Key: []byte("sender"), Value: []byte("foo")}, - {Key: []byte("recipient"), Value: []byte("bar")}, - {Key: []byte("amount"), Value: []byte("35")}, - }, - } + { + Type: "transfer", + Attributes: kv.Pairs{ + {Key: []byte("sender"), Value: []byte("foo")}, + {Key: []byte("recipient"), Value: []byte("bar")}, + {Key: []byte("amount"), Value: []byte("35")}, + }, + } } ``` @@ -517,9 +548,9 @@ In this case, the WS client will receive an error with description: "jsonrpc": "2.0", "id": "{ID}#event", "error": { - "code": -32000, - "msg": "Server error", - "data": "subscription was canceled (reason: client is not pulling messages fast enough)" // or "subscription was canceled (reason: Tendermint exited)" + "code": -32000, + "msg": "Server error", + "data": "subscription was canceled (reason: client is not pulling messages fast enough)" // or "subscription was canceled (reason: Tendermint exited)" } } @@ -725,9 +756,9 @@ just the `Data` field set: ```go []ProofOp{ - ProofOp{ - Data: , - } + ProofOp{ + Data: , + } } ``` diff --git a/cmd/tendermint/commands/key_migrate.go b/cmd/tendermint/commands/key_migrate.go new file mode 100644 index 000000000..739af4a7d --- /dev/null +++ b/cmd/tendermint/commands/key_migrate.go @@ -0,0 +1,64 @@ +package commands + +import ( + "context" + "fmt" + + "github.com/spf13/cobra" + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/scripts/keymigrate" +) + +func MakeKeyMigrateCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "key-migrate", + Short: "Run Database key migration", + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() + + contexts := []string{ + // this is ordered to put the + // (presumably) biggest/most important + // subsets first. + "blockstore", + "state", + "peerstore", + "tx_index", + "evidence", + "light", + } + + for idx, dbctx := range contexts { + logger.Info("beginning a key migration", + "dbctx", dbctx, + "num", idx+1, + "total", len(contexts), + ) + + db, err := cfg.DefaultDBProvider(&cfg.DBContext{ + ID: dbctx, + Config: config, + }) + + if err != nil { + return fmt.Errorf("constructing database handle: %w", err) + } + + if err = keymigrate.Migrate(ctx, db); err != nil { + return fmt.Errorf("running migration for context %q: %w", + dbctx, err) + } + } + + logger.Info("completed database migration successfully") + + return nil + }, + } + + // allow database info to be overridden via cli + addDBFlags(cmd) + + return cmd +} diff --git a/cmd/tendermint/commands/run_node.go b/cmd/tendermint/commands/run_node.go index 1c68fcffe..97d6197a2 100644 --- a/cmd/tendermint/commands/run_node.go +++ b/cmd/tendermint/commands/run_node.go @@ -83,7 +83,10 @@ func AddNodeFlags(cmd *cobra.Command) { config.Consensus.CreateEmptyBlocksInterval.String(), "the possible interval between empty blocks") - // db flags + addDBFlags(cmd) +} + +func addDBFlags(cmd *cobra.Command) { cmd.Flags().String( "db-backend", config.DBBackend, diff --git a/cmd/tendermint/main.go b/cmd/tendermint/main.go index 810e35327..c006c297d 100644 --- a/cmd/tendermint/main.go +++ b/cmd/tendermint/main.go @@ -29,6 +29,7 @@ func main() { cmd.GenNodeKeyCmd, cmd.VersionCmd, cmd.InspectCmd, + cmd.MakeKeyMigrateCommand(), debug.DebugCmd, cli.NewCompletionCmd(rootCmd, true), ) diff --git a/docs/architecture/README.md b/docs/architecture/README.md index a4e326274..7025a72f6 100644 --- a/docs/architecture/README.md +++ b/docs/architecture/README.md @@ -97,3 +97,4 @@ Note the context/background should be written in the present tense. - [ADR-041: Proposer-Selection-via-ABCI](./adr-041-proposer-selection-via-abci.md) - [ADR-045: ABCI-Evidence](./adr-045-abci-evidence.md) - [ADR-057: RPC](./adr-057-RPC.md) +- [ADR-069: Node Initialization](./adr-069-flexible-node-initialization.md) diff --git a/docs/architecture/adr-069-flexible-node-intitalization.md b/docs/architecture/adr-069-flexible-node-intitalization.md new file mode 100644 index 000000000..ec66725be --- /dev/null +++ b/docs/architecture/adr-069-flexible-node-intitalization.md @@ -0,0 +1,273 @@ +# ADR 069: Flexible Node Initialization + +## Changlog + +- 2021-06-09: Initial Draft (@tychoish) + +- 2021-07-21: Major Revision (@tychoish) + +## Status + +Proposed. + +## Context + +In an effort to support [Go-API-Stability](./adr-060-go-api-stability.md), +during the 0.35 development cycle, we have attempted to reduce the the API +surface area by moving most of the interface of the `node` package into +unexported functions, as well as moving the reactors to an `internal` +package. Having this coincide with the 0.35 release made a lot of sense +because these interfaces were _already_ changing as a result of the `p2p` +[refactor](./adr-061-p2p-refactor-scope.md), so it made sense to think a bit +more about how tendermint exposes this API. + +While the interfaces of the P2P layer and most of the node package are already +internalized, this precludes some operational patterns that are important to +users who use tendermint as a library. Specifically, introspecting the +tendermint node service and replacing components is not supported in the latest +version of the code, and some of these use cases would require maintaining a +vendor copy of the code. Adding these features requires rather extensive +(internal/implementation) changes to the `node` and `rpc` packages, and this +ADR describes a model for changing the way that tendermint nodes initialize, in +service of providing this kind of functionality. + +We consider node initialization, because the current implemention +provides strong connections between all components, as well as between +the components of the node and the RPC layer, and being able to think +about the interactions of these components will help enable these +features and help define the requirements of the node package. + +## Alternative Approaches + +These alternatives are presented to frame the design space and to +contextualize the decision in terms of product requirements. These +ideas are not inherently bad, and may even be possible or desireable +in the (distant) future, and merely provide additional context for how +we, in the moment came to our decision(s). + +### Do Nothing + +The current implementation is functional and sufficient for the vast +majority of use cases (e.g., all users of the Cosmos-SDK as well as +anyone who runs tendermint and the ABCI application in separate +processes). In the current implementation, and even previous versions, +modifying node initialization or injecting custom components required +copying most of the `node` package, which required such users +to maintain a vendored copy of tendermint. + +While this is (likely) not tenable in the long term, as users do want +more modularity, and the current service implementation is brittle and +difficult to maintain, in the short term it may be possible to delay +implementation somewhat. Eventually, however, we will need to make the +`node` package easier to maintain and reason about. + +### Generic Service Pluggability + +One possible system design would export interfaces (in the Golang +sense) for all components of the system, to permit runtime dependency +injection of all components in the system, so that users can compose +tendermint nodes of arbitrary user-supplied components. + +Although this level of customization would provide benefits, it would be a huge +undertaking (particularly with regards to API design work) that we do not have +scope for at the moment. Eventually providing support for some kinds of +pluggability may be useful, so the current solution does not explicitly +foreclose the possibility of this alternative. + +### Abstract Dependency Based Startup and Shutdown + +The main proposal in this document makes tendermint node initialization simpler +and more abstract, but the system lacks a number of +features which daemon/service initialization could provide, such as a +system allowing the authors of services to control initialization and shutdown order +of components using dependency relationships. + +Such a system could work by allowing services to declare +initialization order dependencies to other reactors (by ID, perhaps) +so that the node could decide the initialization based on the +dependencies declared by services rather than requiring the node to +encode this logic directly. + +This level of configuration is probably more complicated than is needed. Given +that the authors of components in the current implementation of tendermint +already *do* need to know about other components, a dependency-based system +would probably be overly-abstract at this stage. + +## Decisions + +- To the greatest extent possible, factor the code base so that + packages are responsible for their own initialization, and minimize + the amount of code in the `node` package itself. + +- As a design goal, reduce direct coupling and dependencies between + components in the implementation of `node`. + +- Begin iterating on a more-flexible internal framework for + initializing tendermint nodes to make the initatilization process + less hard-coded by the implementation of the node objects. + + - Reactors should not need to expose their interfaces *within* the + implementation of the node type + + - This refactoring should be entirely opaque to users. + + - These node initialization changes should not require a + reevaluation of the `service.Service` or a generic initialization + orchestration framework. + +- Do not proactively provide a system for injecting + components/services within a tendtermint node, though make it + possible to retrofit this kind of plugability in the future if + needed. + +- Prioritize implementation of p2p-based statesync reactor to obviate + need for users to inject a custom state-sync provider. + +## Detailed Design + +The [current +nodeImpl](https://github.com/tendermint/tendermint/blob/master/node/node.go#L47) +includes direct references to the implementations of each of the +reactors, which should be replaced by references to `service.Service` +objects. This will require moving construction of the [rpc +service](https://github.com/tendermint/tendermint/blob/master/node/node.go#L771) +into the constructor of +[makeNode](https://github.com/tendermint/tendermint/blob/master/node/node.go#L126). One +possible implementation of this would be to eliminate the current +`ConfigureRPC` method on the node package and instead [configure it +here](https://github.com/tendermint/tendermint/pull/6798/files#diff-375d57e386f20eaa5f09f02bb9d28bfc48ac3dca18d0325f59492208219e5618R441). + +To avoid adding complexity to the `node` package, we will add a +composite service implementation to the `service` package +that implements `service.Service` and is composed of a sequence of +underlying `service.Service` objects and handles their +startup/shutdown in the specified sequential order. + +Consensus, blocksync (*née* fast sync), and statesync all depend on +each other, and have significant initialization dependencies that are +presently encoded in the `node` package. As part of this change, a +new package/component (likely named `blocks` located at +`internal/blocks`) will encapsulate the initialization of these block +management areas of the code. + +### Injectable Component Option + +This section briefly describes a possible implementation for +user-supplied services running within a node. This should not be +implemented unless user-supplied components are a hard requirement for +a user. + +In order to allow components to be replaced, a new public function +will be added to the public interface of `node` with a signature that +resembles the following: + +```go +func NewWithServices(conf *config.Config, + logger log.Logger, + cf proxy.ClientCreator, + gen *types.GenesisDoc, + srvs []service.Service, +) (service.Service, error) { +``` + +The `service.Service` objects will be initialized in the order supplied, after +all pre-configured/default services have started (and shut down in reverse +order). The given services may implement additional interfaces, allowing them +to replace specific default services. `NewWithServices` will validate input +service lists with the following rules: + +- None of the services may already be running. +- The caller may not supply more than one replacement reactor for a given + default service type. + +If callers violate any of these rules, `NewWithServices` will return +an error. To retract support for this kind of operation in the future, +the function can be modified to *always* return an error. + +## Consequences + +### Positive + +- The node package will become easier to maintain. + +- It will become easier to add additional services within tendermint + nodes. + +- It will become possible to replace default components in the node + package without vendoring the tendermint repo and modifying internal + code. + +- The current end-to-end (e2e) test suite will be able to prevent any + regressions, and the new functionality can be thoroughly unit tested. + +- The scope of this project is very narrow, which minimizes risk. + +### Negative + +- This increases our reliance on the `service.Service` interface which + is probably not an interface that we want to fully commit to. + +- This proposal implements a fairly minimal set of functionality and + leaves open the possibility for many additional features which are + not included in the scope of this proposal. + +### Neutral + +N/A + +## Open Questions + +- To what extent does this new initialization framework need to accommodate + the legacy p2p stack? Would it be possible to delay a great deal of this + work to the 0.36 cycle to avoid this complexity? + + - Answer: _depends on timing_, and the requirement to ship pluggable reactors in 0.35. + +- Where should additional public types be exported for the 0.35 + release? + + Related to the general project of API stabilization we want to deprecate + the `types` package, and move its contents into a new `pkg` hierarchy; + however, the design of the `pkg` interface is currently underspecified. + If `types` is going to remain for the 0.35 release, then we should consider + the impact of using multiple organizing modalities for this code within a + single release. + +## Future Work + +- Improve or simplify the `service.Service` interface. There are some + pretty clear limitations with this interface as written (there's no + way to timeout slow startup or shut down, the cycle between the + `service.BaseService` and `service.Service` implementations is + troubling, the default panic in `OnReset` seems troubling.) + +- As part of the refactor of `service.Service` have all services/nodes + respect the lifetime of a `context.Context` object, and avoid the + current practice of creating `context.Context` objects in p2p and + reactor code. This would be required for in-process multi-tenancy. + +- Support explicit dependencies between components and allow for + parallel startup, so that different reactors can startup at the same + time, where possible. + +## References + +- [this + branch](https://github.com/tendermint/tendermint/tree/tychoish/scratch-node-minimize) + contains experimental work in the implementation of the node package + to unwind some of the hard dependencies between components. + +- [the component + graph](https://peter.bourgon.org/go-for-industrial-programming/#the-component-graph) + as a framing for internal service construction. + +## Appendix + +### Dependencies + +There's a relationship between the blockchain and consensus reactor +described by the following dependency graph makes replacing some of +these components more difficult relative to other reactors or +components. + +![consensus blockchain dependency graph](./img/consensus_blockchain.png) diff --git a/docs/architecture/img/consensus_blockchain.png b/docs/architecture/img/consensus_blockchain.png new file mode 100644 index 000000000..dd0f4daa8 Binary files /dev/null and b/docs/architecture/img/consensus_blockchain.png differ diff --git a/go.mod b/go.mod index a23bc5167..84c6f43ac 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/vektra/mockery/v2 v2.9.0 golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 - google.golang.org/grpc v1.39.1 + google.golang.org/grpc v1.40.0 gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect pgregory.net/rapid v0.4.7 ) diff --git a/go.sum b/go.sum index 3e20b60ab..81abd8a00 100644 --- a/go.sum +++ b/go.sum @@ -1368,8 +1368,8 @@ google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= -google.golang.org/grpc v1.39.1 h1:f37vZbBVTiJ6jKG5mWz8ySOBxNqy6ViPgyhSdVnxF3E= -google.golang.org/grpc v1.39.1/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= +google.golang.org/grpc v1.40.0 h1:AGJ0Ih4mHjSeibYkFGh1dD9KJ/eOtZ93I6hoHhukQ5Q= +google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/libs/bytes/bytes.go b/libs/bytes/bytes.go index cfb7a8db2..dd8e39737 100644 --- a/libs/bytes/bytes.go +++ b/libs/bytes/bytes.go @@ -27,15 +27,22 @@ func (bz *HexBytes) Unmarshal(data []byte) error { return nil } -// MarshalJSON implements the json.Marshaler interface. The hex bytes is a -// quoted hexadecimal encoded string. +// MarshalJSON implements the json.Marshaler interface. The encoding is a JSON +// quoted string of hexadecimal digits. func (bz HexBytes) MarshalJSON() ([]byte, error) { - s := strings.ToUpper(hex.EncodeToString(bz)) - jbz := make([]byte, len(s)+2) - jbz[0] = '"' - copy(jbz[1:], s) - jbz[len(jbz)-1] = '"' - return jbz, nil + size := hex.EncodedLen(len(bz)) + 2 // +2 for quotation marks + buf := make([]byte, size) + hex.Encode(buf[1:], []byte(bz)) + buf[0] = '"' + buf[size-1] = '"' + + // Ensure letter digits are capitalized. + for i := 1; i < size-1; i++ { + if buf[i] >= 'a' && buf[i] <= 'f' { + buf[i] = 'A' + (buf[i] - 'a') + } + } + return buf, nil } // UnmarshalJSON implements the json.Umarshaler interface. diff --git a/libs/bytes/bytes_test.go b/libs/bytes/bytes_test.go index db882f1c1..6a9ca7c3d 100644 --- a/libs/bytes/bytes_test.go +++ b/libs/bytes/bytes_test.go @@ -37,6 +37,7 @@ func TestJSONMarshal(t *testing.T) { {[]byte(``), `{"B1":"","B2":""}`}, {[]byte(`a`), `{"B1":"YQ==","B2":"61"}`}, {[]byte(`abc`), `{"B1":"YWJj","B2":"616263"}`}, + {[]byte("\x1a\x2b\x3c"), `{"B1":"Gis8","B2":"1A2B3C"}`}, } for i, tc := range cases { diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index 54a030fe8..7548470b5 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -231,34 +231,45 @@ func (s *Server) Unsubscribe(ctx context.Context, args UnsubscribeArgs) error { return err } var qs string + if args.Query != nil { qs = args.Query.String() } - s.mtx.RLock() - clientSubscriptions, ok := s.subscriptions[args.Subscriber] - if args.ID != "" { - qs, ok = clientSubscriptions[args.ID] + clientSubscriptions, err := func() (map[string]string, error) { + s.mtx.RLock() + defer s.mtx.RUnlock() - if ok && args.Query == nil { - var err error - args.Query, err = query.New(qs) - if err != nil { - return err + clientSubscriptions, ok := s.subscriptions[args.Subscriber] + if args.ID != "" { + qs, ok = clientSubscriptions[args.ID] + + if ok && args.Query == nil { + var err error + args.Query, err = query.New(qs) + if err != nil { + return nil, err + } } + } else if qs != "" { + args.ID, ok = clientSubscriptions[qs] } - } else if qs != "" { - args.ID, ok = clientSubscriptions[qs] - } - s.mtx.RUnlock() - if !ok { - return ErrSubscriptionNotFound + if !ok { + return nil, ErrSubscriptionNotFound + } + + return clientSubscriptions, nil + }() + + if err != nil { + return err } select { case s.cmds <- cmd{op: unsub, clientID: args.Subscriber, query: args.Query, subscription: &Subscription{id: args.ID}}: s.mtx.Lock() + defer s.mtx.Unlock() delete(clientSubscriptions, args.ID) delete(clientSubscriptions, qs) @@ -266,7 +277,6 @@ func (s *Server) Unsubscribe(ctx context.Context, args UnsubscribeArgs) error { if len(clientSubscriptions) == 0 { delete(s.subscriptions, args.Subscriber) } - s.mtx.Unlock() return nil case <-ctx.Done(): return ctx.Err() @@ -288,8 +298,10 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { select { case s.cmds <- cmd{op: unsub, clientID: clientID}: s.mtx.Lock() + defer s.mtx.Unlock() + delete(s.subscriptions, clientID) - s.mtx.Unlock() + return nil case <-ctx.Done(): return ctx.Err() diff --git a/node/node.go b/node/node.go index 2dca7b82c..b6ea1e974 100644 --- a/node/node.go +++ b/node/node.go @@ -18,7 +18,6 @@ import ( cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto" cs "github.com/tendermint/tendermint/internal/consensus" - "github.com/tendermint/tendermint/internal/evidence" "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p/pex" @@ -37,7 +36,6 @@ import ( grpccore "github.com/tendermint/tendermint/rpc/grpc" rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server" sm "github.com/tendermint/tendermint/state" - "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" ) @@ -71,16 +69,12 @@ type nodeImpl struct { mempool mempool.Mempool stateSync bool // whether the node should state sync on startup stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots - consensusState *cs.State // latest consensus state consensusReactor *cs.Reactor // for participating in the consensus - pexReactor *pex.Reactor // for exchanging peer addresses - pexReactorV2 *pex.ReactorV2 // for exchanging peer addresses - evidenceReactor *evidence.Reactor - evidencePool *evidence.Pool // tracking evidence - proxyApp proxy.AppConns // connection to the application + pexReactor service.Service // for exchanging peer addresses + evidenceReactor service.Service rpcListeners []net.Listener // rpc servers - eventSinks []indexer.EventSink - indexerService *indexer.Service + indexerService service.Service + rpcEnv *rpccore.Environment prometheusSrv *http.Server } @@ -371,46 +365,43 @@ func makeNode(config *cfg.Config, // Note we currently use the addrBook regardless at least for AddOurAddress var ( - pexReactor *pex.Reactor - pexReactorV2 *pex.ReactorV2 - sw *p2p.Switch - addrBook pex.AddrBook + pexReactor service.Service + sw *p2p.Switch + addrBook pex.AddrBook ) pexCh := pex.ChannelDescriptor() transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh}) - if config.P2P.PexReactor { - if config.P2P.DisableLegacy { - addrBook = nil - pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router) - if err != nil { - return nil, err - } - } else { - // setup Transport and Switch - sw = createSwitch( - config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch, - stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger, - ) - - err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) - if err != nil { - return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err) - } - - err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) - if err != nil { - return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) - } - - addrBook, err = createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey) - if err != nil { - return nil, fmt.Errorf("could not create addrbook: %w", err) - } - - pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) + if config.P2P.DisableLegacy { + addrBook = nil + pexReactor, err = createPEXReactorV2(config, logger, peerManager, router) + if err != nil { + return nil, err } + } else { + // setup Transport and Switch + sw = createSwitch( + config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch, + stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger, + ) + + err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) + if err != nil { + return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err) + } + + err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) + if err != nil { + return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) + } + + addrBook, err = createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey) + if err != nil { + return nil, fmt.Errorf("could not create addrbook: %w", err) + } + + pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) } if config.RPC.PprofListenAddress != "" { @@ -438,19 +429,37 @@ func makeNode(config *cfg.Config, bcReactor: bcReactor, mempoolReactor: mpReactor, mempool: mp, - consensusState: csState, consensusReactor: csReactor, stateSyncReactor: stateSyncReactor, stateSync: stateSync, pexReactor: pexReactor, - pexReactorV2: pexReactorV2, evidenceReactor: evReactor, - evidencePool: evPool, - proxyApp: proxyApp, indexerService: indexerService, eventBus: eventBus, - eventSinks: eventSinks, + + rpcEnv: &rpccore.Environment{ + ProxyAppQuery: proxyApp.Query(), + ProxyAppMempool: proxyApp.Mempool(), + + StateStore: stateStore, + BlockStore: blockStore, + EvidencePool: evPool, + ConsensusState: csState, + P2PPeers: sw, + BlockSyncReactor: bcReactor.(cs.BlockSyncReactor), + + GenDoc: genDoc, + EventSinks: eventSinks, + ConsensusReactor: csReactor, + EventBus: eventBus, + Mempool: mp, + Logger: logger.With("module", "rpc"), + Config: *config.RPC, + }, } + + node.rpcEnv.P2PTransport = node + node.BaseService = *service.NewBaseService(logger, "Node", node) return node, nil @@ -483,25 +492,6 @@ func makeSeedNode(config *cfg.Config, p2pMetrics := p2p.PrometheusMetrics(config.Instrumentation.Namespace, "chain_id", genDoc.ChainID) p2pLogger := logger.With("module", "p2p") transport := createTransport(p2pLogger, config) - sw := createSwitch( - config, transport, p2pMetrics, nil, nil, - nil, nil, nil, nil, nodeInfo, nodeKey, p2pLogger, - ) - - err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) - if err != nil { - return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err) - } - - err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) - if err != nil { - return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) - } - - addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey) - if err != nil { - return nil, fmt.Errorf("could not create addrbook: %w", err) - } peerManager, err := createPeerManager(config, dbProvider, p2pLogger, nodeKey.ID) if err != nil { @@ -515,8 +505,9 @@ func makeSeedNode(config *cfg.Config, } var ( - pexReactor *pex.Reactor - pexReactorV2 *pex.ReactorV2 + pexReactor service.Service + sw *p2p.Switch + addrBook pex.AddrBook ) // add the pex reactor @@ -526,11 +517,31 @@ func makeSeedNode(config *cfg.Config, pexCh := pex.ChannelDescriptor() transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh}) if config.P2P.DisableLegacy { - pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router) + pexReactor, err = createPEXReactorV2(config, logger, peerManager, router) if err != nil { return nil, err } } else { + sw = createSwitch( + config, transport, p2pMetrics, nil, nil, + nil, nil, nil, nil, nodeInfo, nodeKey, p2pLogger, + ) + + err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) + if err != nil { + return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err) + } + + err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) + if err != nil { + return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) + } + + addrBook, err = createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey) + if err != nil { + return nil, fmt.Errorf("could not create addrbook: %w", err) + } + pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) } @@ -553,8 +564,7 @@ func makeSeedNode(config *cfg.Config, peerManager: peerManager, router: router, - pexReactor: pexReactor, - pexReactorV2: pexReactorV2, + pexReactor: pexReactor, } node.BaseService = *service.NewBaseService(logger, "SeedNode", node) @@ -595,23 +605,22 @@ func (n *nodeImpl) OnStart() error { } n.isListening = true - n.Logger.Info("p2p service", "legacy_enabled", !n.config.P2P.DisableLegacy) if n.config.P2P.DisableLegacy { - err = n.router.Start() + if err = n.router.Start(); err != nil { + return err + } } else { // Add private IDs to addrbook to block those peers being added n.addrBook.AddPrivateIDs(strings.SplitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " ")) - err = n.sw.Start() - } - if err != nil { - return err + if err = n.sw.Start(); err != nil { + return err + } } if n.config.Mode != cfg.ModeSeed { if n.config.BlockSync.Version == cfg.BlockSyncV0 { - // Start the real blockchain reactor separately since the switch uses the shim. if err := n.bcReactor.Start(); err != nil { return err } @@ -638,8 +647,8 @@ func (n *nodeImpl) OnStart() error { } } - if n.config.P2P.DisableLegacy && n.pexReactorV2 != nil { - if err := n.pexReactorV2.Start(); err != nil { + if n.config.P2P.DisableLegacy { + if err := n.pexReactor.Start(); err != nil { return err } } else { @@ -648,7 +657,6 @@ func (n *nodeImpl) OnStart() error { if err != nil { return fmt.Errorf("could not dial peers from persistent-peers field: %w", err) } - } // Run state sync @@ -723,10 +731,8 @@ func (n *nodeImpl) OnStop() { } } - if n.config.P2P.DisableLegacy && n.pexReactorV2 != nil { - if err := n.pexReactorV2.Stop(); err != nil { - n.Logger.Error("failed to stop the PEX v2 reactor", "err", err) - } + if err := n.pexReactor.Stop(); err != nil { + n.Logger.Error("failed to stop the PEX v2 reactor", "err", err) } if n.config.P2P.DisableLegacy { @@ -767,55 +773,23 @@ func (n *nodeImpl) OnStop() { } } -// ConfigureRPC makes sure RPC has all the objects it needs to operate. -func (n *nodeImpl) ConfigureRPC() (*rpccore.Environment, error) { - rpcCoreEnv := rpccore.Environment{ - ProxyAppQuery: n.proxyApp.Query(), - ProxyAppMempool: n.proxyApp.Mempool(), - - StateStore: n.stateStore, - BlockStore: n.blockStore, - EvidencePool: n.evidencePool, - ConsensusState: n.consensusState, - P2PPeers: n.sw, - P2PTransport: n, - - GenDoc: n.genesisDoc, - EventSinks: n.eventSinks, - ConsensusReactor: n.consensusReactor, - EventBus: n.eventBus, - Mempool: n.mempool, - - Logger: n.Logger.With("module", "rpc"), - - Config: *n.config.RPC, - BlockSyncReactor: n.bcReactor.(cs.BlockSyncReactor), - } +func (n *nodeImpl) startRPC() ([]net.Listener, error) { if n.config.Mode == cfg.ModeValidator { pubKey, err := n.privValidator.GetPubKey(context.TODO()) if pubKey == nil || err != nil { return nil, fmt.Errorf("can't get pubkey: %w", err) } - rpcCoreEnv.PubKey = pubKey + n.rpcEnv.PubKey = pubKey } - if err := rpcCoreEnv.InitGenesisChunks(); err != nil { - return nil, err - } - - return &rpcCoreEnv, nil -} - -func (n *nodeImpl) startRPC() ([]net.Listener, error) { - env, err := n.ConfigureRPC() - if err != nil { + if err := n.rpcEnv.InitGenesisChunks(); err != nil { return nil, err } listenAddrs := strings.SplitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ") - routes := env.GetRoutes() + routes := n.rpcEnv.GetRoutes() if n.config.RPC.Unsafe { - env.AddUnsafe(routes) + n.rpcEnv.AddUnsafe(routes) } config := rpcserver.DefaultConfig() @@ -912,7 +886,7 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) { return nil, err } go func() { - if err := grpccore.StartGRPCServer(env, listener); err != nil { + if err := grpccore.StartGRPCServer(n.rpcEnv, listener); err != nil { n.Logger.Error("Error starting gRPC server", "err", err) } }() @@ -945,46 +919,16 @@ func (n *nodeImpl) startPrometheusServer(addr string) *http.Server { return srv } -// Switch returns the Node's Switch. -func (n *nodeImpl) Switch() *p2p.Switch { - return n.sw -} - -// BlockStore returns the Node's BlockStore. -func (n *nodeImpl) BlockStore() *store.BlockStore { - return n.blockStore -} - -// ConsensusState returns the Node's ConsensusState. -func (n *nodeImpl) ConsensusState() *cs.State { - return n.consensusState -} - // ConsensusReactor returns the Node's ConsensusReactor. func (n *nodeImpl) ConsensusReactor() *cs.Reactor { return n.consensusReactor } -// MempoolReactor returns the Node's mempool reactor. -func (n *nodeImpl) MempoolReactor() service.Service { - return n.mempoolReactor -} - // Mempool returns the Node's mempool. func (n *nodeImpl) Mempool() mempool.Mempool { return n.mempool } -// PEXReactor returns the Node's PEXReactor. It returns nil if PEX is disabled. -func (n *nodeImpl) PEXReactor() *pex.Reactor { - return n.pexReactor -} - -// EvidencePool returns the Node's EvidencePool. -func (n *nodeImpl) EvidencePool() *evidence.Pool { - return n.evidencePool -} - // EventBus returns the Node's EventBus. func (n *nodeImpl) EventBus() *types.EventBus { return n.eventBus @@ -1001,19 +945,9 @@ func (n *nodeImpl) GenesisDoc() *types.GenesisDoc { return n.genesisDoc } -// ProxyApp returns the Node's AppConns, representing its connections to the ABCI application. -func (n *nodeImpl) ProxyApp() proxy.AppConns { - return n.proxyApp -} - -// Config returns the Node's config. -func (n *nodeImpl) Config() *cfg.Config { - return n.config -} - -// EventSinks returns the Node's event indexing sinks. -func (n *nodeImpl) EventSinks() []indexer.EventSink { - return n.eventSinks +// RPCEnvironment makes sure RPC has all the objects it needs to operate. +func (n *nodeImpl) RPCEnvironment() *rpccore.Environment { + return n.rpcEnv } //------------------------------------------------------------------------------ diff --git a/node/node_test.go b/node/node_test.go index 16edb4210..64b28c0bb 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -513,36 +513,50 @@ func TestNodeSetEventSink(t *testing.T) { config := cfg.ResetTestRoot("node_app_version_test") defer os.RemoveAll(config.RootDir) - n := getTestNode(t, config, log.TestingLogger()) + logger := log.TestingLogger() + setupTest := func(t *testing.T, conf *cfg.Config) []indexer.EventSink { + eventBus, err := createAndStartEventBus(logger) + require.NoError(t, err) - assert.Equal(t, 1, len(n.eventSinks)) - assert.Equal(t, indexer.KV, n.eventSinks[0].Type()) + genDoc, err := types.GenesisDocFromFile(config.GenesisFile()) + require.NoError(t, err) + + indexService, eventSinks, err := createAndStartIndexerService(config, + cfg.DefaultDBProvider, eventBus, logger, genDoc.ChainID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, indexService.Stop()) }) + return eventSinks + } + + eventSinks := setupTest(t, config) + assert.Equal(t, 1, len(eventSinks)) + assert.Equal(t, indexer.KV, eventSinks[0].Type()) config.TxIndex.Indexer = []string{"null"} - n = getTestNode(t, config, log.TestingLogger()) + eventSinks = setupTest(t, config) - assert.Equal(t, 1, len(n.eventSinks)) - assert.Equal(t, indexer.NULL, n.eventSinks[0].Type()) + assert.Equal(t, 1, len(eventSinks)) + assert.Equal(t, indexer.NULL, eventSinks[0].Type()) config.TxIndex.Indexer = []string{"null", "kv"} - n = getTestNode(t, config, log.TestingLogger()) + eventSinks = setupTest(t, config) - assert.Equal(t, 1, len(n.eventSinks)) - assert.Equal(t, indexer.NULL, n.eventSinks[0].Type()) + assert.Equal(t, 1, len(eventSinks)) + assert.Equal(t, indexer.NULL, eventSinks[0].Type()) config.TxIndex.Indexer = []string{"kvv"} - ns, err := newDefaultNode(config, log.TestingLogger()) + ns, err := newDefaultNode(config, logger) assert.Nil(t, ns) assert.Equal(t, errors.New("unsupported event sink type"), err) config.TxIndex.Indexer = []string{} - n = getTestNode(t, config, log.TestingLogger()) + eventSinks = setupTest(t, config) - assert.Equal(t, 1, len(n.eventSinks)) - assert.Equal(t, indexer.NULL, n.eventSinks[0].Type()) + assert.Equal(t, 1, len(eventSinks)) + assert.Equal(t, indexer.NULL, eventSinks[0].Type()) config.TxIndex.Indexer = []string{"psql"} - ns, err = newDefaultNode(config, log.TestingLogger()) + ns, err = newDefaultNode(config, logger) assert.Nil(t, ns) assert.Equal(t, errors.New("the psql connection settings cannot be empty"), err) @@ -550,46 +564,46 @@ func TestNodeSetEventSink(t *testing.T) { config.TxIndex.Indexer = []string{"psql"} config.TxIndex.PsqlConn = psqlConn - n = getTestNode(t, config, log.TestingLogger()) - assert.Equal(t, 1, len(n.eventSinks)) - assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type()) - n.OnStop() + eventSinks = setupTest(t, config) + + assert.Equal(t, 1, len(eventSinks)) + assert.Equal(t, indexer.PSQL, eventSinks[0].Type()) config.TxIndex.Indexer = []string{"psql", "kv"} config.TxIndex.PsqlConn = psqlConn - n = getTestNode(t, config, log.TestingLogger()) - assert.Equal(t, 2, len(n.eventSinks)) + eventSinks = setupTest(t, config) + + assert.Equal(t, 2, len(eventSinks)) // we use map to filter the duplicated sinks, so it's not guarantee the order when append sinks. - if n.eventSinks[0].Type() == indexer.KV { - assert.Equal(t, indexer.PSQL, n.eventSinks[1].Type()) + if eventSinks[0].Type() == indexer.KV { + assert.Equal(t, indexer.PSQL, eventSinks[1].Type()) } else { - assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type()) - assert.Equal(t, indexer.KV, n.eventSinks[1].Type()) + assert.Equal(t, indexer.PSQL, eventSinks[0].Type()) + assert.Equal(t, indexer.KV, eventSinks[1].Type()) } - n.OnStop() config.TxIndex.Indexer = []string{"kv", "psql"} config.TxIndex.PsqlConn = psqlConn - n = getTestNode(t, config, log.TestingLogger()) - assert.Equal(t, 2, len(n.eventSinks)) - if n.eventSinks[0].Type() == indexer.KV { - assert.Equal(t, indexer.PSQL, n.eventSinks[1].Type()) + eventSinks = setupTest(t, config) + + assert.Equal(t, 2, len(eventSinks)) + if eventSinks[0].Type() == indexer.KV { + assert.Equal(t, indexer.PSQL, eventSinks[1].Type()) } else { - assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type()) - assert.Equal(t, indexer.KV, n.eventSinks[1].Type()) + assert.Equal(t, indexer.PSQL, eventSinks[0].Type()) + assert.Equal(t, indexer.KV, eventSinks[1].Type()) } - n.OnStop() var e = errors.New("found duplicated sinks, please check the tx-index section in the config.toml") config.TxIndex.Indexer = []string{"psql", "kv", "Kv"} config.TxIndex.PsqlConn = psqlConn - _, err = newDefaultNode(config, log.TestingLogger()) + _, err = newDefaultNode(config, logger) require.Error(t, err) assert.Equal(t, e, err) config.TxIndex.Indexer = []string{"Psql", "kV", "kv", "pSql"} config.TxIndex.PsqlConn = psqlConn - _, err = newDefaultNode(config, log.TestingLogger()) + _, err = newDefaultNode(config, logger) require.Error(t, err) assert.Equal(t, e, err) } diff --git a/node/setup.go b/node/setup.go index fb88846f5..ae706ee22 100644 --- a/node/setup.go +++ b/node/setup.go @@ -649,7 +649,7 @@ func createPEXReactorV2( logger log.Logger, peerManager *p2p.PeerManager, router *p2p.Router, -) (*pex.ReactorV2, error) { +) (service.Service, error) { channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 128) if err != nil { diff --git a/rpc/client/local/local.go b/rpc/client/local/local.go index 0663ebf67..d752e6a93 100644 --- a/rpc/client/local/local.go +++ b/rpc/client/local/local.go @@ -2,6 +2,7 @@ package local import ( "context" + "errors" "fmt" "time" @@ -46,15 +47,15 @@ type Local struct { // NodeService describes the portion of the node interface that the // local RPC client constructor needs to build a local client. type NodeService interface { - ConfigureRPC() (*rpccore.Environment, error) + RPCEnvironment() *rpccore.Environment EventBus() *types.EventBus } // New configures a client that calls the Node directly. func New(node NodeService) (*Local, error) { - env, err := node.ConfigureRPC() - if err != nil { - return nil, err + env := node.RPCEnvironment() + if env == nil { + return nil, errors.New("rpc is nil") } return &Local{ EventBus: node.EventBus(), diff --git a/rpc/core/net.go b/rpc/core/net.go index edcf8fffa..8f3e89d77 100644 --- a/rpc/core/net.go +++ b/rpc/core/net.go @@ -36,6 +36,10 @@ func (env *Environment) NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, e // UnsafeDialSeeds dials the given seeds (comma-separated id@IP:PORT). func (env *Environment) UnsafeDialSeeds(ctx *rpctypes.Context, seeds []string) (*ctypes.ResultDialSeeds, error) { + if env.P2PPeers == nil { + return nil, errors.New("peer management system does not support this operation") + } + if len(seeds) == 0 { return &ctypes.ResultDialSeeds{}, fmt.Errorf("%w: no seeds provided", ctypes.ErrInvalidRequest) } @@ -53,6 +57,10 @@ func (env *Environment) UnsafeDialPeers( peers []string, persistent, unconditional, private bool) (*ctypes.ResultDialPeers, error) { + if env.P2PPeers == nil { + return nil, errors.New("peer management system does not support this operation") + } + if len(peers) == 0 { return &ctypes.ResultDialPeers{}, fmt.Errorf("%w: no peers provided", ctypes.ErrInvalidRequest) }