Compare commits

...

42 Commits

Author SHA1 Message Date
tycho garen
37cce2542a merge 2022-07-08 13:14:01 -04:00
tycho garen
1cc4509029 fix: merge error 2022-06-09 09:42:13 -04:00
tycho garen
9456b7975f Merge remote-tracking branch 'origin/master' into main/libp2p 2022-06-09 09:21:41 -04:00
tycho garen
2994fe00e4 Merge remote-tracking branch 'origin/master' into main/libp2p 2022-05-26 05:06:38 -04:00
Sam Kleinman
86cf79a9d9 p2p: libp2p constructor scafolding (#8592) 2022-05-24 04:44:26 -04:00
tycho garen
fff97326a1 Merge remote-tracking branch 'origin/master' into main/libp2p 2022-05-23 08:32:12 -04:00
Sam Kleinman
6dd0e8d83a p2p: use more appropriate context (#8563) 2022-05-16 21:56:17 -04:00
Sam Kleinman
2cecf7f9c7 p2p: cache libp2p streams (#8562) 2022-05-16 10:31:55 -04:00
tycho garen
4452949c81 Merge remote-tracking branch 'origin/master' into main/libp2p 2022-05-16 09:44:07 -04:00
Sam Kleinman
ffa7af3c36 p2p: collect errors from channel (#8544) 2022-05-16 09:40:29 -04:00
tycho garen
8e9614d625 Merge remote-tracking branch 'origin/master' into main/libp2p 2022-05-16 09:09:57 -04:00
Sam Kleinman
9db41016ab p2p: avoid sending errors after context is canceled (#8548) 2022-05-14 08:28:09 -04:00
tycho garen
b43336b706 Merge remote-tracking branch 'origin/master' into main/libp2p 2022-05-13 11:17:40 -04:00
Sam Kleinman
b7ccee6240 p2p: simple request and response framework (#8500) 2022-05-11 14:43:54 -04:00
tycho garen
d30a1821cc Merge remote-tracking branch 'origin/master' into main/libp2p 2022-05-09 10:34:22 -04:00
Sam Kleinman
f1c9a56d57 p2p: base implementation of libp2p channel (#8476) 2022-05-09 10:34:02 -04:00
tycho garen
2bc4f12aae Merge remote-tracking branch 'origin/master' into main/libp2p 2022-05-06 10:26:20 -04:00
tycho garen
0d844825f1 Merge branch 'master' into main/libp2p 2022-05-06 10:25:10 -04:00
M. J. Fromberger
379136a55c keymigrate: improve filtering for legacy transaction hashes (#8466)
This is a follow-up to #8352. The check for legacy evidence keys is only based
on the prefix of the key. Hashes, which are unprefixed, could easily have this
form and be misdiagnosed.

Because the conversion for evidence checks the key structure, this should not
cause corruption. The probability that a hash is a syntactically valid evidence
key is negligible.  The tool will report an error rather than storing bad data.
But this does mean that such transaction hashes could cause the migration to
stop and report an error before it is complete.

To ensure we convert all the data, refine the legacy key check to filter these
keys more precisely. Update the test cases to exercise this condition.

* Update upgrading instructions.
2022-05-06 10:25:00 -04:00
William Banfield
229ec9dd6e docs: minor fixups to pbts overview (#8454) 2022-05-06 10:25:00 -04:00
dependabot[bot]
1e2fd769c2 build(deps): Bump github.com/creachadair/atomicfile from 0.2.5 to 0.2.6 (#8460)
Bumps [github.com/creachadair/atomicfile](https://github.com/creachadair/atomicfile) from 0.2.5 to 0.2.6.
- [Release notes](https://github.com/creachadair/atomicfile/releases)
- [Commits](https://github.com/creachadair/atomicfile/compare/v0.2.5...v0.2.6)

---
updated-dependencies:
- dependency-name: github.com/creachadair/atomicfile
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2022-05-06 10:25:00 -04:00
tycho garen
6e16cd1ff8 Merge remote-tracking branch 'origin/master' into main/libp2p 2022-05-02 12:51:14 -04:00
Sam Kleinman
03879849d9 p2p: make p2p.Channel an interface (#8446) 2022-04-29 21:33:15 -04:00
tycho garen
82f66026a5 Merge remote-tracking branch 'origin/master' into main/libp2p 2022-04-29 17:22:42 -04:00
Sam Kleinman
9f94161163 p2p: scope and add libp2p components (#8443) 2022-04-29 11:57:40 -04:00
tycho garen
e7ed0720fd Merge remote-tracking branch 'origin/master' into main/libp2p 2022-04-29 10:30:20 -04:00
tycho garen
83239d2b06 Merge remote-tracking branch 'origin/master' into main/libp2p 2022-04-29 09:43:06 -04:00
Thane Thomson
516ff45392 abci++: Vote extension cleanup (#8402)
* Split vote verification/validation based on vote extensions

Some parts of the code need vote extensions to be verified and
validated (mostly in consensus), and other parts of the code don't
because its possible that, in some cases (as per RFC 017), we won't have
vote extensions.

This explicitly facilitates that split.

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Only sign extensions in precommits, not prevotes

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Update privval/file.go

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>

* Apply suggestions from code review

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>

* Temporarily disable extension requirement again for E2E testing

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Reorganize comment for clarity

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Leave vote validation and pre-call nil check up to caller of VoteToProto

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Split complex vote validation test into multiple tests

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Universally enforce no vote extensions on any vote type but precommits

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Make error messages more generic

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Verify with vote extensions when constructing a VoteSet

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Expand comment for clarity

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Add extension check for prevotes prior to signing votes

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Fix supporting test code to only inject extensions into precommits

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Separate vote malleation from signing in vote tests for clarity

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Add extension signature length check and corresponding test

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Perform basic vote validation in CommitToVoteSet

Signed-off-by: Thane Thomson <connect@thanethomson.com>

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>
2022-04-28 17:54:33 -04:00
M. J. Fromberger
4868bb48f5 Use patched link-checker for periodic checks. (#8430)
In #8339 we pointed the markdown link checker action to a patched version that
has the up-to-date version of the underlying check tool. In doing so, I missed
the periodic cron job that runs the same workflow. Update it to use the patched
version also.
2022-04-28 17:54:33 -04:00
Sam Kleinman
4844af2b8d p2p: move transport options into struct (#8432) 2022-04-28 17:51:00 -04:00
tycho garen
72405b450f Merge remote-tracking branch 'origin/master' into main/libp2p 2022-04-28 08:45:10 -04:00
Sam Kleinman
99b862228f node: start rpc service after reactors (#8426) 2022-04-27 15:57:30 -04:00
Sam Kleinman
385ad1e1e3 p2p: remove support for multiple transports and endpoints (#8420) 2022-04-27 15:57:30 -04:00
M. J. Fromberger
251e41895f Unify RPC method signatures and parameter decoding (#8397)
Pass all parameters from JSON-RPC requests to their corresponding handlers
using struct types instead of positional parameters. This allows us to control
encoding of arguments using only the standard library, and to eliminate the
remaining special-purpose JSON encoding hooks in the server.

To support existing use, the server still allows arguments to be encoded in
JSON as either an array or an object.

Related changes:

- Rework the RPCFunc constructor to reduce reflection during RPC call service.
- Add request parameter wrappers for each RPC service method.
- Update the RPC Environment methods to use these types.
- Update the interfaces and shims derived from Environment to the new
  signatures.
- Update and extend test cases.
2022-04-27 15:57:30 -04:00
elias-orijtech
0dda3ff88b fuzz: don't panic on expected errors (#8423)
In the conversion to Go 1.18 fuzzing in e4991fd862,
a `return 0` was converted to a panic. A `return 0` is a hint to the fuzzer, not
a failing testcase.

While here, clean up the test by folding setup code into it.
2022-04-27 15:57:30 -04:00
dependabot[bot]
be501981e7 build(deps): Bump github.com/google/go-cmp from 0.5.7 to 0.5.8 (#8422)
Bumps [github.com/google/go-cmp](https://github.com/google/go-cmp) from 0.5.7 to 0.5.8.
<details>
<summary>Release notes</summary>
<p><em>Sourced from <a href="https://github.com/google/go-cmp/releases">github.com/google/go-cmp's releases</a>.</em></p>
<blockquote>
<h2>v0.5.8</h2>
<p>Reporter changes:</p>
<ul>
<li>(<a href="https://github-redirect.dependabot.com/google/go-cmp/issues/293">#293</a>) Fix printing of types in reporter output for interface and pointer types</li>
<li>(<a href="https://github-redirect.dependabot.com/google/go-cmp/issues/294">#294</a>) Use string formatting for slice of bytes in more circumstances</li>
</ul>
<p>Dependency changes:</p>
<ul>
<li>(<a href="https://github-redirect.dependabot.com/google/go-cmp/issues/292">#292</a>) Update minimum supported version to go1.13 and remove <code>xerrors</code> dependency</li>
</ul>
</blockquote>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a href="f144a35ed4"><code>f144a35</code></a> Additional cleanup with Go 1.13 as minimal version (<a href="https://github-redirect.dependabot.com/google/go-cmp/issues/295">#295</a>)</li>
<li><a href="63c2960be6"><code>63c2960</code></a> remove xerrors (<a href="https://github-redirect.dependabot.com/google/go-cmp/issues/292">#292</a>)</li>
<li><a href="71220fc3ca"><code>71220fc</code></a> Use string formatting for slice of bytes (<a href="https://github-redirect.dependabot.com/google/go-cmp/issues/294">#294</a>)</li>
<li><a href="4664e24d52"><code>4664e24</code></a> Fix printing of types in reporter output (<a href="https://github-redirect.dependabot.com/google/go-cmp/issues/293">#293</a>)</li>
<li><a href="79433ace28"><code>79433ac</code></a> Run tests on Go 1.18 (<a href="https://github-redirect.dependabot.com/google/go-cmp/issues/290">#290</a>)</li>
<li>See full diff in <a href="https://github.com/google/go-cmp/compare/v0.5.7...v0.5.8">compare view</a></li>
</ul>
</details>
<br />


[![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=github.com/google/go-cmp&package-manager=go_modules&previous-version=0.5.7&new-version=0.5.8)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)

Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- `@dependabot rebase` will rebase this PR
- `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it
- `@dependabot merge` will merge this PR after your CI passes on it
- `@dependabot squash and merge` will squash and merge this PR after your CI passes on it
- `@dependabot cancel merge` will cancel a previously requested merge and block automerging
- `@dependabot reopen` will reopen this PR if it is closed
- `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
- `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
- `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
- `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)


</details>
2022-04-27 15:57:30 -04:00
dependabot[bot]
d5b9bc4c18 build(deps): Bump github.com/vektra/mockery/v2 from 2.12.0 to 2.12.1 (#8417)
Bumps [github.com/vektra/mockery/v2](https://github.com/vektra/mockery) from 2.12.0 to 2.12.1.
<details>
<summary>Release notes</summary>
<p><em>Sourced from <a href="https://github.com/vektra/mockery/releases">github.com/vektra/mockery/v2's releases</a>.</em></p>
<blockquote>
<h2>v2.12.1</h2>
<h2>Changelog</h2>
<ul>
<li>facf60b Add extra test cases for increasing code coverage.</li>
<li>2e1360a Collapse if statements and rename interface in the fixtures.</li>
<li>8bdc90d Fix test on go1.18.</li>
<li>fe03b57 Fix tests.</li>
<li>b8c62f7 Fix: avoid package name collision with inPackage (<a href="https://github-redirect.dependabot.com/vektra/mockery/issues/291">#291</a>)</li>
<li>c9dc740 Merge pull request <a href="https://github-redirect.dependabot.com/vektra/mockery/issues/422">#422</a> from i-sevostyanov/fix-package-collision</li>
<li>58a7f18 Merge pull request <a href="https://github-redirect.dependabot.com/vektra/mockery/issues/452">#452</a> from grongor/refactor-first-letter-helper</li>
<li>749b2d6 Refactor mock name generation</li>
</ul>
</blockquote>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a href="c9dc740b50"><code>c9dc740</code></a> Merge pull request <a href="https://github-redirect.dependabot.com/vektra/mockery/issues/422">#422</a> from i-sevostyanov/fix-package-collision</li>
<li><a href="facf60b02e"><code>facf60b</code></a> Add extra test cases for increasing code coverage.</li>
<li><a href="8bdc90da7a"><code>8bdc90d</code></a> Fix test on go1.18.</li>
<li><a href="fe03b57da5"><code>fe03b57</code></a> Fix tests.</li>
<li><a href="2e1360ae46"><code>2e1360a</code></a> Collapse if statements and rename interface in the fixtures.</li>
<li><a href="b8c62f7858"><code>b8c62f7</code></a> Fix: avoid package name collision with inPackage (<a href="https://github-redirect.dependabot.com/vektra/mockery/issues/291">#291</a>)</li>
<li><a href="58a7f185bd"><code>58a7f18</code></a> Merge pull request <a href="https://github-redirect.dependabot.com/vektra/mockery/issues/452">#452</a> from grongor/refactor-first-letter-helper</li>
<li><a href="749b2d6fa5"><code>749b2d6</code></a> Refactor mock name generation</li>
<li>See full diff in <a href="https://github.com/vektra/mockery/compare/v2.12.0...v2.12.1">compare view</a></li>
</ul>
</details>
<br />


[![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=github.com/vektra/mockery/v2&package-manager=go_modules&previous-version=2.12.0&new-version=2.12.1)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)

Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- `@dependabot rebase` will rebase this PR
- `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it
- `@dependabot merge` will merge this PR after your CI passes on it
- `@dependabot squash and merge` will squash and merge this PR after your CI passes on it
- `@dependabot cancel merge` will cancel a previously requested merge and block automerging
- `@dependabot reopen` will reopen this PR if it is closed
- `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
- `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
- `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
- `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)


</details>
2022-04-27 15:57:30 -04:00
Thane Thomson
4859631a6a abci++: Remove intermediate protos (#8414)
* Sync protos with their intermediates

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Remove intermediate protos and their supporting scripts

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* make proto-gen

Signed-off-by: Thane Thomson <connect@thanethomson.com>
2022-04-27 15:57:30 -04:00
Sam Kleinman
4da31ca2c8 crypto: remove unused code (#8412) 2022-04-27 15:57:30 -04:00
Sam Kleinman
e41ba263be config: minor template infrastructure (#8411) 2022-04-27 15:57:30 -04:00
dependabot[bot]
da277baa69 build(deps): Bump google.golang.org/grpc from 1.45.0 to 1.46.0 (#8408)
Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.45.0 to 1.46.0.
<details>
<summary>Release notes</summary>
<p><em>Sourced from <a href="https://github.com/grpc/grpc-go/releases">google.golang.org/grpc's releases</a>.</em></p>
<blockquote>
<h2>Release 1.46.0</h2>
<h1>New Features</h1>
<ul>
<li>server: Support setting <code>TCP_USER_TIMEOUT</code> on <code>grpc.Server</code> connections using <code>keepalive.ServerParameters.Time</code> (<a href="https://github-redirect.dependabot.com/grpc/grpc-go/issues/5219">#5219</a>)
<ul>
<li>Special Thanks: <a href="https://github.com/bonnefoa"><code>@​bonnefoa</code></a></li>
</ul>
</li>
<li>client: perform graceful switching of LB policies in the <code>ClientConn</code> by default (<a href="https://github-redirect.dependabot.com/grpc/grpc-go/issues/5285">#5285</a>)</li>
<li>all: improve logging by including channelz identifier in log messages (<a href="https://github-redirect.dependabot.com/grpc/grpc-go/issues/5192">#5192</a>)</li>
</ul>
<h1>API Changes</h1>
<ul>
<li>grpc: delete <code>WithBalancerName()</code> API, deprecated over 4 years ago in <a href="https://github-redirect.dependabot.com/grpc/grpc-go/issues/1697">#1697</a> (<a href="https://github-redirect.dependabot.com/grpc/grpc-go/issues/5232">#5232</a>)</li>
<li>balancer: change BuildOptions.ChannelzParentID to an opaque identifier instead of int (<a href="https://github-redirect.dependabot.com/grpc/grpc-go/issues/5192">#5192</a>)
<ul>
<li>Note: the balancer package is labeled as EXPERIMENTAL, and we don't believe users were using this field.</li>
</ul>
</li>
</ul>
<h1>Behavior Changes</h1>
<ul>
<li>client: change connectivity state to <code>TransientFailure</code> in <code>pick_first</code> LB policy when all addresses are removed (<a href="https://github-redirect.dependabot.com/grpc/grpc-go/issues/5274">#5274</a>)
<ul>
<li>This is a minor change that brings grpc-go's behavior in line with the intended behavior and how C and Java behave.</li>
</ul>
</li>
<li>metadata: add client-side validation of HTTP-invalid metadata before attempting to send (<a href="https://github-redirect.dependabot.com/grpc/grpc-go/issues/4886">#4886</a>)
<ul>
<li>Special Thanks: <a href="https://github.com/Patrick0308"><code>@​Patrick0308</code></a></li>
</ul>
</li>
</ul>
<h1>Bug Fixes</h1>
<ul>
<li>metadata: make a copy of the value slices in FromContext() functions so that modifications won't be made to the original copy (<a href="https://github-redirect.dependabot.com/grpc/grpc-go/issues/5267">#5267</a>)</li>
<li>client: handle invalid service configs by applying the default, if applicable (<a href="https://github-redirect.dependabot.com/grpc/grpc-go/issues/5238">#5238</a>)</li>
<li>xds: the xds client will now apply a 1 second backoff before recreating ADS or LRS streams (<a href="https://github-redirect.dependabot.com/grpc/grpc-go/issues/5280">#5280</a>)</li>
</ul>
<h1>Dependencies</h1>
<ul>
<li>Upgrade security/authorization module dependencies to <a href="https://github.com/google/cel-go">https://github.com/google/cel-go</a> v0.10.1 and others (<a href="https://github-redirect.dependabot.com/grpc/grpc-go/issues/5243">#5243</a>)
<ul>
<li>Special Thanks: <a href="https://github.com/TristonianJones"><code>@​TristonianJones</code></a></li>
</ul>
</li>
</ul>
</blockquote>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a href="e8d06c51a5"><code>e8d06c5</code></a> Change version to 1.46.0 (<a href="https://github-redirect.dependabot.com/grpc/grpc-go/issues/5296">#5296</a>)</li>
<li><a href="efbd542a4f"><code>efbd542</code></a> gcp/observability: correctly test this module in presubmit tests (<a href="https://github-redirect.dependabot.com/grpc/grpc-go/issues/5300">#5300</a>) (<a href="https://github-redirect.dependabot.com/grpc/grpc-go/issues/5307">#5307</a>)</li>
<li><a href="4467a29dbb"><code>4467a29</code></a> gcp/observability: implement logging via binarylog (<a href="https://github-redirect.dependabot.com/grpc/grpc-go/issues/5196">#5196</a>)</li>
<li><a href="18fdf542fa"><code>18fdf54</code></a> cmd/protoc-gen-go-grpc: allow hooks to modify client structs and service hand...</li>
<li><a href="337b815c41"><code>337b815</code></a> interop: build client without timeout; add logs to help debug failures (<a href="https://github-redirect.dependabot.com/grpc/grpc-go/issues/5294">#5294</a>)</li>
<li><a href="e583b196ce"><code>e583b19</code></a> xds: Add RLS in xDS e2e test (<a href="https://github-redirect.dependabot.com/grpc/grpc-go/issues/5281">#5281</a>)</li>
<li><a href="0066bf69de"><code>0066bf6</code></a> grpc: perform graceful switching of LB policies in the <code>ClientConn</code> by defaul...</li>
<li><a href="3cccf6a43b"><code>3cccf6a</code></a> xdsclient: always backoff between new streams even after successful stream (#...</li>
<li><a href="4e780933f8"><code>4e78093</code></a> xds: ignore routes with unsupported cluster specifiers (<a href="https://github-redirect.dependabot.com/grpc/grpc-go/issues/5269">#5269</a>)</li>
<li><a href="99aae3442d"><code>99aae34</code></a> cluster manager: Add Graceful Switch functionality to Cluster Manager (<a href="https://github-redirect.dependabot.com/grpc/grpc-go/issues/5265">#5265</a>)</li>
<li>Additional commits viewable in <a href="https://github.com/grpc/grpc-go/compare/v1.45.0...v1.46.0">compare view</a></li>
</ul>
</details>
<br />


[![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=google.golang.org/grpc&package-manager=go_modules&previous-version=1.45.0&new-version=1.46.0)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)

Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- `@dependabot rebase` will rebase this PR
- `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it
- `@dependabot merge` will merge this PR after your CI passes on it
- `@dependabot squash and merge` will squash and merge this PR after your CI passes on it
- `@dependabot cancel merge` will cancel a previously requested merge and block automerging
- `@dependabot reopen` will reopen this PR if it is closed
- `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
- `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
- `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
- `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)


</details>
2022-04-27 15:57:30 -04:00
Sam Kleinman
19ba3c6375 p2p: add libp2p feature flag (#8410) 2022-04-25 11:47:31 -04:00
34 changed files with 1360 additions and 348 deletions

View File

@@ -662,6 +662,11 @@ type P2PConfig struct { //nolint: maligned
// layer uses. Options are: "fifo" and "priority",
// with the default being "priority".
QueueType string `mapstructure:"queue-type"`
// UseLibP2P switches to using the new networking layer based
// on libp2p. This option is unlikely to persist into a
// release of tendermint, but will ease the transition.
UseLibP2P bool `mapstructure:"experimental-use-lib-p2p"`
}
// DefaultP2PConfig returns a default configuration for the peer-to-peer layer

44
go.mod
View File

@@ -67,6 +67,7 @@ require (
github.com/bombsimon/wsl/v3 v3.3.0 // indirect
github.com/breml/bidichk v0.2.3 // indirect
github.com/breml/errchkjson v0.3.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.1.3 // indirect
github.com/butuzov/ireturn v0.1.1 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cespare/xxhash v1.1.0 // indirect
@@ -77,6 +78,7 @@ require (
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/daixiang0/gci v0.3.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/denis-tingaikin/go-header v0.4.3 // indirect
github.com/dgraph-io/badger/v2 v2.2007.2 // indirect
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de // indirect
@@ -118,6 +120,7 @@ require (
github.com/golangci/revgrep v0.0.0-20210930125155-c22e5001d4f2 // indirect
github.com/golangci/unconvert v0.0.0-20180507085042-28b1c447d1f4 // indirect
github.com/google/btree v1.0.0 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/gordonklaus/ineffassign v0.0.0-20210914165742-4cc7213b9bc8 // indirect
github.com/gostaticanalysis/analysisutil v0.7.1 // indirect
github.com/gostaticanalysis/comment v1.4.2 // indirect
@@ -127,9 +130,16 @@ require (
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-version v1.4.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hexops/gotextdiff v1.0.3 // indirect
github.com/huin/goupnp v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/ipfs/go-cid v0.2.0 // indirect
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jdxcode/netrc v0.0.0-20210204082910-926c7f70242a // indirect
github.com/jgautheron/goconst v1.5.1 // indirect
github.com/jhump/protocompile v0.0.0-20220216033700-d705409f108f // indirect
@@ -141,13 +151,27 @@ require (
github.com/kisielk/errcheck v1.6.0 // indirect
github.com/kisielk/gotool v1.0.0 // indirect
github.com/klauspost/compress v1.15.1 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/klauspost/pgzip v1.2.5 // indirect
github.com/koron/go-ssdp v0.0.0-20191105050749-2e1c40ed0b5d // indirect
github.com/kulti/thelper v0.6.2 // indirect
github.com/kunwardeep/paralleltest v1.0.3 // indirect
github.com/kyoh86/exportloopref v0.1.8 // indirect
github.com/ldez/gomoddirectives v0.2.3 // indirect
github.com/ldez/tagliatelle v0.3.1 // indirect
github.com/leonklingele/grouper v1.1.0 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-eventbus v0.2.1 // indirect
github.com/libp2p/go-libp2p v0.20.3 // indirect
github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect
github.com/libp2p/go-libp2p-core v0.19.1 // indirect
github.com/libp2p/go-libp2p-discovery v0.6.0 // indirect
github.com/libp2p/go-libp2p-peerstore v0.6.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.7.1 // indirect
github.com/libp2p/go-msgio v0.2.0 // indirect
github.com/libp2p/go-nat v0.1.0 // indirect
github.com/libp2p/go-netroute v0.2.0 // indirect
github.com/libp2p/go-openssl v0.0.7 // indirect
github.com/lufeee/execinquery v1.0.0 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/maratori/testpackage v1.0.1 // indirect
@@ -158,9 +182,23 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mbilski/exhaustivestruct v1.2.0 // indirect
github.com/mgechev/revive v1.2.1 // indirect
github.com/miekg/dns v1.1.43 // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moricho/tparallel v0.2.1 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.0.3 // indirect
github.com/multiformats/go-base36 v0.1.0 // indirect
github.com/multiformats/go-multiaddr v0.5.0 // indirect
github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
github.com/multiformats/go-multibase v0.0.3 // indirect
github.com/multiformats/go-multicodec v0.4.1 // indirect
github.com/multiformats/go-multihash v0.1.0 // indirect
github.com/multiformats/go-multistream v0.3.3 // indirect
github.com/multiformats/go-varint v0.0.6 // indirect
github.com/nakabonne/nestif v0.3.1 // indirect
github.com/nbutton23/zxcvbn-go v0.0.0-20210217022336-fa2cb2858354 // indirect
github.com/nishanths/exhaustive v0.7.11 // indirect
@@ -169,6 +207,7 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/opencontainers/runc v1.1.3 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.2 // indirect
github.com/phayes/checkstyle v0.0.0-20170904204023-bfd46e6a821d // indirect
@@ -193,6 +232,8 @@ require (
github.com/sivchari/tenv v1.5.0 // indirect
github.com/sonatard/noctx v0.0.1 // indirect
github.com/sourcegraph/go-diff v0.6.1 // indirect
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
@@ -208,9 +249,11 @@ require (
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 // indirect
github.com/tomarrell/wrapcheck/v2 v2.6.1 // indirect
github.com/tommy-muehle/go-mnd/v2 v2.5.0 // indirect
github.com/tychoish/emt v0.1.0 // indirect
github.com/ultraware/funlen v0.0.3 // indirect
github.com/ultraware/whitespace v0.0.5 // indirect
github.com/uudashr/gocognit v1.0.5 // indirect
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect
github.com/yagipy/maintidx v1.0.0 // indirect
github.com/yeya24/promlinter v0.2.0 // indirect
gitlab.com/bosi/decorder v0.2.1 // indirect
@@ -231,6 +274,7 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
honnef.co/go/tools v0.3.1 // indirect
lukechampine.com/blake3 v1.1.6 // indirect
mvdan.cc/gofumpt v0.3.1 // indirect
mvdan.cc/interfacer v0.0.0-20180901003855-c20040233aed // indirect
mvdan.cc/lint v0.0.0-20170908181259-adc824a0674b // indirect

447
go.sum

File diff suppressed because it is too large Load Diff

View File

@@ -135,7 +135,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
if err != nil {
return err
}
r.chCreator = func(context.Context, *conn.ChannelDescriptor) (*p2p.Channel, error) { return blockSyncCh, nil }
r.chCreator = func(context.Context, *conn.ChannelDescriptor) (p2p.Channel, error) { return blockSyncCh, nil }
state, err := r.stateStore.Load()
if err != nil {
@@ -183,7 +183,7 @@ func (r *Reactor) OnStop() {
// respondToPeer loads a block and sends it to the requesting peer, if we have it.
// Otherwise, we'll respond saying we do not have it.
func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID, blockSyncCh *p2p.Channel) error {
func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID, blockSyncCh p2p.Channel) error {
block := r.store.LoadBlock(msg.Height)
if block == nil {
r.logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height)
@@ -223,7 +223,7 @@ func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest,
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blockSyncCh *p2p.Channel) (err error) {
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blockSyncCh p2p.Channel) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e)
@@ -298,7 +298,7 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blo
// message execution will result in a PeerError being sent on the BlockSyncChannel.
// When the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Channel) {
func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh p2p.Channel) {
iter := blockSyncCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
@@ -319,7 +319,7 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Chann
}
// processPeerUpdate processes a PeerUpdate.
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, blockSyncCh *p2p.Channel) {
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, blockSyncCh p2p.Channel) {
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
// XXX: Pool#RedoRequest can sometimes give us an empty peer.
@@ -354,7 +354,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, blockSyncCh *p2p.Channel) {
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, blockSyncCh p2p.Channel) {
for {
select {
case <-ctx.Done():
@@ -396,7 +396,7 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error {
return nil
}
func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh *p2p.Channel) {
func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh p2p.Channel) {
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
defer statusUpdateTicker.Stop()
@@ -438,7 +438,7 @@ func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh *p2p.Channel)
// do.
//
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh *p2p.Channel) {
func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh p2p.Channel) {
var (
trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond)
switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second)

View File

@@ -37,7 +37,7 @@ type reactorTestSuite struct {
reactors map[types.NodeID]*Reactor
app map[types.NodeID]abciclient.Client
blockSyncChannels map[types.NodeID]*p2p.Channel
blockSyncChannels map[types.NodeID]p2p.Channel
peerChans map[types.NodeID]chan p2p.PeerUpdate
peerUpdates map[types.NodeID]*p2p.PeerUpdates
}
@@ -64,7 +64,7 @@ func setup(
nodes: make([]types.NodeID, 0, numNodes),
reactors: make(map[types.NodeID]*Reactor, numNodes),
app: make(map[types.NodeID]abciclient.Client, numNodes),
blockSyncChannels: make(map[types.NodeID]*p2p.Channel, numNodes),
blockSyncChannels: make(map[types.NodeID]p2p.Channel, numNodes),
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes),
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
}
@@ -177,7 +177,7 @@ func (rts *reactorTestSuite) addNode(
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (p2p.Channel, error) {
return rts.blockSyncChannels[nodeID], nil
}

View File

@@ -107,7 +107,7 @@ func invalidDoPrevoteFunc(
round int32,
cs *State,
r *Reactor,
voteCh *p2p.Channel,
voteCh p2p.Channel,
pv types.PrivValidator,
) {
// routine to:

View File

@@ -165,10 +165,10 @@ func NewReactor(
}
type channelBundle struct {
state *p2p.Channel
data *p2p.Channel
vote *p2p.Channel
votSet *p2p.Channel
state p2p.Channel
data p2p.Channel
vote p2p.Channel
votSet p2p.Channel
}
// OnStart starts separate go routines for each p2p Channel and listens for
@@ -310,14 +310,14 @@ func (r *Reactor) GetPeerState(peerID types.NodeID) (*PeerState, bool) {
return ps, ok
}
func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error {
func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState, stateCh p2p.Channel) error {
return stateCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: makeRoundStepMessage(rs),
})
}
func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error {
func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState, stateCh p2p.Channel) error {
psHeader := rs.ProposalBlockParts.Header()
return stateCh.Send(ctx, p2p.Envelope{
Broadcast: true,
@@ -331,7 +331,7 @@ func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes
})
}
func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote, stateCh *p2p.Channel) error {
func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote, stateCh p2p.Channel) error {
return stateCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: &tmcons.HasVote{
@@ -346,7 +346,7 @@ func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote,
// subscribeToBroadcastEvents subscribes for new round steps and votes using the
// internal pubsub defined in the consensus state to broadcast them to peers
// upon receiving.
func (r *Reactor) subscribeToBroadcastEvents(ctx context.Context, stateCh *p2p.Channel) {
func (r *Reactor) subscribeToBroadcastEvents(ctx context.Context, stateCh p2p.Channel) {
onStopCh := r.state.getOnStopCh()
err := r.state.evsw.AddListenerForEvent(
@@ -403,7 +403,7 @@ func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep {
}
}
func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID, stateCh *p2p.Channel) error {
func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID, stateCh p2p.Channel) error {
return stateCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: makeRoundStepMessage(r.getRoundState()),
@@ -433,7 +433,7 @@ func (r *Reactor) getRoundState() *cstypes.RoundState {
return r.rs
}
func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, dataCh *p2p.Channel) {
func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, dataCh p2p.Channel) {
logger := r.logger.With("height", prs.Height).With("peer", ps.peerID)
if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
@@ -497,7 +497,7 @@ func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundSta
time.Sleep(r.state.config.PeerGossipSleepDuration)
}
func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState, dataCh *p2p.Channel) {
func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState, dataCh p2p.Channel) {
logger := r.logger.With("peer", ps.peerID)
timer := time.NewTimer(0)
@@ -632,7 +632,7 @@ OUTER_LOOP:
// pickSendVote picks a vote and sends it to the peer. It will return true if
// there is a vote to send and false otherwise.
func (r *Reactor) pickSendVote(ctx context.Context, ps *PeerState, votes types.VoteSetReader, voteCh *p2p.Channel) (bool, error) {
func (r *Reactor) pickSendVote(ctx context.Context, ps *PeerState, votes types.VoteSetReader, voteCh p2p.Channel) (bool, error) {
vote, ok := ps.PickVoteToSend(votes)
if !ok {
return false, nil
@@ -660,7 +660,7 @@ func (r *Reactor) gossipVotesForHeight(
rs *cstypes.RoundState,
prs *cstypes.PeerRoundState,
ps *PeerState,
voteCh *p2p.Channel,
voteCh p2p.Channel,
) (bool, error) {
logger := r.logger.With("height", prs.Height).With("peer", ps.peerID)
@@ -732,7 +732,7 @@ func (r *Reactor) gossipVotesForHeight(
return false, nil
}
func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh *p2p.Channel) {
func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh p2p.Channel) {
logger := r.logger.With("peer", ps.peerID)
timer := time.NewTimer(0)
@@ -804,7 +804,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh
// NOTE: `queryMaj23Routine` has a simple crude design since it only comes
// into play for liveness when there's a signature DDoS attack happening.
func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh *p2p.Channel) {
func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh p2p.Channel) {
timer := time.NewTimer(0)
defer timer.Stop()
@@ -1015,7 +1015,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// If we fail to find the peer state for the envelope sender, we perform a no-op
// and return. This can happen when we process the envelope after the peer is
// removed.
func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message, voteSetCh *p2p.Channel) error {
func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message, voteSetCh p2p.Channel) error {
ps, ok := r.GetPeerState(envelope.From)
if !ok || ps == nil {
r.logger.Debug("failed to find peer state", "peer", envelope.From, "ch_id", "StateChannel")

View File

@@ -46,10 +46,10 @@ type reactorTestSuite struct {
reactors map[types.NodeID]*Reactor
subs map[types.NodeID]eventbus.Subscription
blocksyncSubs map[types.NodeID]eventbus.Subscription
stateChannels map[types.NodeID]*p2p.Channel
dataChannels map[types.NodeID]*p2p.Channel
voteChannels map[types.NodeID]*p2p.Channel
voteSetBitsChannels map[types.NodeID]*p2p.Channel
stateChannels map[types.NodeID]p2p.Channel
dataChannels map[types.NodeID]p2p.Channel
voteChannels map[types.NodeID]p2p.Channel
voteSetBitsChannels map[types.NodeID]p2p.Channel
}
func chDesc(chID p2p.ChannelID, size int) *p2p.ChannelDescriptor {
@@ -86,7 +86,7 @@ func setup(
t.Cleanup(cancel)
chCreator := func(nodeID types.NodeID) p2p.ChannelCreator {
return func(ctx context.Context, desc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
return func(ctx context.Context, desc *p2p.ChannelDescriptor) (p2p.Channel, error) {
switch desc.ID {
case StateChannel:
return rts.stateChannels[nodeID], nil

View File

@@ -159,7 +159,7 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (er
// processEvidenceCh implements a blocking event loop where we listen for p2p
// Envelope messages from the evidenceCh.
func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh *p2p.Channel) {
func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh p2p.Channel) {
iter := evidenceCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
@@ -186,7 +186,7 @@ func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh *p2p.Channel
// connects/disconnects frequently from the broadcasting peer(s).
//
// REF: https://github.com/tendermint/tendermint/issues/4727
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, evidenceCh *p2p.Channel) {
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, evidenceCh p2p.Channel) {
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
r.mtx.Lock()
@@ -227,7 +227,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, evidenceCh *p2p.Channel) {
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, evidenceCh p2p.Channel) {
for {
select {
case peerUpdate := <-peerUpdates.Updates():
@@ -249,7 +249,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerU
// that the peer has already received or may not be ready for.
//
// REF: https://github.com/tendermint/tendermint/issues/4727
func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID, evidenceCh *p2p.Channel) {
func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID, evidenceCh p2p.Channel) {
var next *clist.CElement
defer func() {

View File

@@ -38,7 +38,7 @@ type reactorTestSuite struct {
logger log.Logger
reactors map[types.NodeID]*evidence.Reactor
pools map[types.NodeID]*evidence.Pool
evidenceChannels map[types.NodeID]*p2p.Channel
evidenceChannels map[types.NodeID]p2p.Channel
peerUpdates map[types.NodeID]*p2p.PeerUpdates
peerChans map[types.NodeID]chan p2p.PeerUpdate
nodes []*p2ptest.Node
@@ -96,7 +96,7 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store) *reactorTe
rts.network.Nodes[nodeID].PeerManager.Register(ctx, pu)
rts.nodes = append(rts.nodes, rts.network.Nodes[nodeID])
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (p2p.Channel, error) {
return rts.evidenceChannels[nodeID], nil
}

View File

@@ -194,7 +194,7 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (er
// processMempoolCh implements a blocking event loop where we listen for p2p
// Envelope messages from the mempoolCh.
func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh *p2p.Channel) {
func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh p2p.Channel) {
iter := mempoolCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
@@ -215,7 +215,7 @@ func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh *p2p.Channel)
// goroutine or not. If not, we start one for the newly added peer. For down or
// removed peers, we remove the peer from the mempool peer ID set and signal to
// stop the tx broadcasting goroutine.
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, mempoolCh *p2p.Channel) {
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, mempoolCh p2p.Channel) {
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
r.mtx.Lock()
@@ -264,7 +264,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, mempoolCh *p2p.Channel) {
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, mempoolCh p2p.Channel) {
for {
select {
case <-ctx.Done():
@@ -275,7 +275,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerU
}
}
func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, mempoolCh *p2p.Channel) {
func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, mempoolCh p2p.Channel) {
peerMempoolID := r.ids.GetForPeer(peerID)
var nextGossipTx *clist.CElement

View File

@@ -30,7 +30,7 @@ type reactorTestSuite struct {
logger log.Logger
reactors map[types.NodeID]*Reactor
mempoolChannels map[types.NodeID]*p2p.Channel
mempoolChannels map[types.NodeID]p2p.Channel
mempools map[types.NodeID]*TxMempool
kvstores map[types.NodeID]*kvstore.Application
@@ -51,7 +51,7 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode
logger: log.NewNopLogger().With("testCase", t.Name()),
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}),
reactors: make(map[types.NodeID]*Reactor, numNodes),
mempoolChannels: make(map[types.NodeID]*p2p.Channel, numNodes),
mempoolChannels: make(map[types.NodeID]p2p.Channel, numNodes),
mempools: make(map[types.NodeID]*TxMempool, numNodes),
kvstores: make(map[types.NodeID]*kvstore.Application, numNodes),
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes),
@@ -75,7 +75,7 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
chCreator := func(ctx context.Context, chDesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
chCreator := func(ctx context.Context, chDesc *p2p.ChannelDescriptor) (p2p.Channel, error) {
return rts.mempoolChannels[nodeID], nil
}

View File

@@ -1,12 +1,22 @@
package p2p
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"sync"
"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/tychoish/emt"
"github.com/tendermint/tendermint/internal/libs/protoio"
"github.com/tendermint/tendermint/types"
)
@@ -37,6 +47,16 @@ type Wrapper interface {
Unwrap() (proto.Message, error)
}
type Channel interface {
fmt.Stringer
Err() error
Send(context.Context, Envelope) error
SendError(context.Context, PeerError) error
Receive(context.Context) *ChannelIterator
}
// PeerError is a peer error reported via Channel.Error.
//
// FIXME: This currently just disconnects the peer, which is too simplistic.
@@ -56,9 +76,9 @@ type PeerError struct {
func (pe PeerError) Error() string { return fmt.Sprintf("peer=%q: %s", pe.NodeID, pe.Err.Error()) }
func (pe PeerError) Unwrap() error { return pe.Err }
// Channel is a bidirectional channel to exchange Protobuf messages with peers.
// legacyChannel is a bidirectional channel to exchange Protobuf messages with peers.
// Each message is wrapped in an Envelope to specify its sender and receiver.
type Channel struct {
type legacyChannel struct {
ID ChannelID
inCh <-chan Envelope // inbound messages (peers to reactors)
outCh chan<- Envelope // outbound messages (reactors to peers)
@@ -69,9 +89,10 @@ type Channel struct {
// NewChannel creates a new channel. It is primarily for internal and test
// use, reactors should use Router.OpenChannel().
func NewChannel(id ChannelID, inCh <-chan Envelope, outCh chan<- Envelope, errCh chan<- PeerError) *Channel {
return &Channel{
func NewChannel(id ChannelID, name string, inCh <-chan Envelope, outCh chan<- Envelope, errCh chan<- PeerError) Channel {
return &legacyChannel{
ID: id,
name: name,
inCh: inCh,
outCh: outCh,
errCh: errCh,
@@ -80,7 +101,7 @@ func NewChannel(id ChannelID, inCh <-chan Envelope, outCh chan<- Envelope, errCh
// Send blocks until the envelope has been sent, or until ctx ends.
// An error only occurs if the context ends before the send completes.
func (ch *Channel) Send(ctx context.Context, envelope Envelope) error {
func (ch *legacyChannel) Send(ctx context.Context, envelope Envelope) error {
select {
case <-ctx.Done():
return ctx.Err()
@@ -89,9 +110,15 @@ func (ch *Channel) Send(ctx context.Context, envelope Envelope) error {
}
}
func (ch *legacyChannel) Err() error { return nil }
// SendError blocks until the given error has been sent, or ctx ends.
// An error only occurs if the context ends before the send completes.
func (ch *Channel) SendError(ctx context.Context, pe PeerError) error {
func (ch *legacyChannel) SendError(ctx context.Context, pe PeerError) error {
if errors.Is(pe.Err, context.Canceled) || errors.Is(pe.Err, context.DeadlineExceeded) {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
@@ -100,18 +127,29 @@ func (ch *Channel) SendError(ctx context.Context, pe PeerError) error {
}
}
func (ch *Channel) String() string { return fmt.Sprintf("p2p.Channel<%d:%s>", ch.ID, ch.name) }
func (ch *legacyChannel) String() string { return fmt.Sprintf("p2p.Channel<%d:%s>", ch.ID, ch.name) }
// Receive returns a new unbuffered iterator to receive messages from ch.
// The iterator runs until ctx ends.
func (ch *Channel) Receive(ctx context.Context) *ChannelIterator {
func (ch *legacyChannel) Receive(ctx context.Context) *ChannelIterator {
iter := &ChannelIterator{
pipe: make(chan Envelope), // unbuffered
}
go func() {
go func(pipe chan Envelope) {
defer close(iter.pipe)
iteratorWorker(ctx, ch, iter.pipe)
}()
for {
select {
case <-ctx.Done():
return
case envelope := <-ch.inCh:
select {
case <-ctx.Done():
return
case pipe <- envelope:
}
}
}
}(iter.pipe)
return iter
}
@@ -126,21 +164,6 @@ type ChannelIterator struct {
current *Envelope
}
func iteratorWorker(ctx context.Context, ch *Channel, pipe chan Envelope) {
for {
select {
case <-ctx.Done():
return
case envelope := <-ch.inCh:
select {
case <-ctx.Done():
return
case pipe <- envelope:
}
}
}
}
// Next returns true when the Envelope value has advanced, and false
// when the context is canceled or iteration should stop. If an iterator has returned false,
// it will never return true again.
@@ -179,7 +202,7 @@ func (iter *ChannelIterator) Envelope() *Envelope { return iter.current }
//
// This allows the caller to consume messages from multiple channels
// without needing to manage the concurrency separately.
func MergedChannelIterator(ctx context.Context, chs ...*Channel) *ChannelIterator {
func MergedChannelIterator(ctx context.Context, chs ...Channel) *ChannelIterator {
iter := &ChannelIterator{
pipe: make(chan Envelope), // unbuffered
}
@@ -187,10 +210,17 @@ func MergedChannelIterator(ctx context.Context, chs ...*Channel) *ChannelIterato
for _, ch := range chs {
wg.Add(1)
go func(ch *Channel) {
go func(ch Channel, pipe chan Envelope) {
defer wg.Done()
iteratorWorker(ctx, ch, iter.pipe)
}(ch)
iter := ch.Receive(ctx)
for iter.Next(ctx) {
select {
case <-ctx.Done():
return
case pipe <- *iter.Envelope():
}
}
}(ch, iter.pipe)
}
done := make(chan struct{})
@@ -207,3 +237,254 @@ func MergedChannelIterator(ctx context.Context, chs ...*Channel) *ChannelIterato
return iter
}
type libp2pChannelImpl struct {
chDesc *ChannelDescriptor
pubsub *pubsub.PubSub
host host.Host
topic *pubsub.Topic
recvCh chan Envelope
chainID string
wrapper Wrapper
// thread-safe error aggregator used to collect errors seen
// during receiving messages into an iterator.
errs emt.Catcher
// the context is passed when the channel is opened and should
// cover the lifecycle of the channel itself. The contexts
// passed into methods should cover the lifecycle of the
// operations they represent.
ctx context.Context
}
func NewLibP2PChannel(ctx context.Context, chainID string, chDesc *ChannelDescriptor, ps *pubsub.PubSub, h host.Host) (Channel, error) {
ch := &libp2pChannelImpl{
ctx: ctx,
chDesc: chDesc,
pubsub: ps,
host: h,
chainID: chainID,
recvCh: make(chan Envelope, chDesc.RecvMessageCapacity),
errs: emt.NewCatcher(),
}
topic, err := ps.Join(ch.canonicalizedTopicName())
if err != nil {
return nil, err
}
ch.topic = topic
if w, ok := chDesc.MessageType.(Wrapper); ok {
ch.wrapper = w
}
return ch, nil
}
func (ch *libp2pChannelImpl) String() string {
return fmt.Sprintf("Channel<%s>", ch.canonicalizedTopicName())
}
func (ch *libp2pChannelImpl) Err() error { return ch.errs.Resolve() }
func (ch *libp2pChannelImpl) canonicalizedTopicName() string {
return fmt.Sprintf("%s.%s.%d", ch.chainID, ch.chDesc.Name, ch.chDesc.ID)
}
func (ch *libp2pChannelImpl) Receive(ctx context.Context) *ChannelIterator {
// TODO: consider caching an iterator in the channel, or
// erroring if this gets called more than once.
//
// While it's safe to register a handler more than once, we
// could get into a dodgy situation where if you call receive
// more than once, the subsequently messages won't be routed
// correctly.
iter := &ChannelIterator{
pipe: make(chan Envelope),
}
ch.host.SetStreamHandler(protocol.ID(ch.canonicalizedTopicName()), func(stream network.Stream) {
// TODO: properly capture the max message size here.
reader := protoio.NewDelimitedReader(bufio.NewReader(stream), ch.chDesc.RecvBufferCapacity*2)
remote := stream.Conn().RemotePeer()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() { <-ctx.Done(); ch.errs.Add(stream.Close()) }()
for {
payload := proto.Clone(ch.chDesc.MessageType)
if _, err := reader.ReadMsg(payload); err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
ch.errs.Add(err)
continue
}
select {
case <-ctx.Done():
return
case iter.pipe <- Envelope{
From: types.NodeID(remote),
Message: payload,
ChannelID: ch.chDesc.ID,
}:
}
}
})
sub, err := ch.topic.Subscribe()
if err != nil {
ch.errs.Add(err)
return nil
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case e := <-ch.recvCh:
select {
case <-ctx.Done():
return
case iter.pipe <- e:
continue
}
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for {
msg, err := sub.Next(ctx)
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
ch.errs.Add(err)
return
}
payload := proto.Clone(ch.chDesc.MessageType)
if err := proto.Unmarshal(msg.Data, payload); err != nil {
ch.errs.Add(err)
return
}
if wrapper, ok := payload.(Wrapper); ok {
if payload, err = wrapper.Unwrap(); err != nil {
ch.errs.Add(err)
return
}
}
select {
case <-ctx.Done():
return
case iter.pipe <- Envelope{
From: types.NodeID(msg.From),
Message: payload,
ChannelID: ch.chDesc.ID,
}:
}
}
}()
// TODO: this is probably wrong in it's current form: the
// handler for point-to-point messages could still end up
// trying to send into the pipe after things close.
go func() { wg.Wait(); defer close(iter.pipe) }()
return iter
}
func (ch *libp2pChannelImpl) Send(ctx context.Context, e Envelope) error {
if ch.wrapper != nil {
msg := proto.Clone(ch.wrapper)
if err := msg.(Wrapper).Wrap(e.Message); err != nil {
return err
}
e.Message = msg
}
if e.Broadcast {
e.From = types.NodeID(ch.host.ID())
bz, err := proto.Marshal(e.Message)
if err != nil {
return err
}
return ch.topic.Publish(ctx, bz)
}
switch ch.host.Network().Connectedness(peer.ID(e.To)) {
case network.CannotConnect:
return fmt.Errorf("cannot connect to %q", e.To)
default:
stream, err := ch.getStream(peer.ID(e.To))
if err != nil {
return err
}
writer := protoio.NewDelimitedWriter(bufio.NewWriter(stream))
_, err = writer.WriteMsg(e.Message)
if err != nil {
return err
}
}
return nil
}
func (ch *libp2pChannelImpl) getStream(peer peer.ID) (network.Stream, error) {
conns := ch.host.Network().ConnsToPeer(peer)
pid := protocol.ID(ch.canonicalizedTopicName())
if len(conns) > 0 {
for cidx := range conns {
streams := conns[cidx].GetStreams()
for sidx := range streams {
stream := streams[sidx]
if stream.Protocol() == pid && stream.Stat().Direction == network.DirOutbound {
return stream, nil
}
}
}
}
conn, err := ch.host.Network().DialPeer(ch.ctx, peer)
if err != nil {
return nil, err
}
stream, err := ch.host.NewStream(ch.ctx, conn.RemotePeer(), pid)
if err != nil {
return nil, err
}
return stream, nil
}
func (ch *libp2pChannelImpl) SendError(ctx context.Context, pe PeerError) error {
if errors.Is(pe.Err, context.Canceled) || errors.Is(pe.Err, context.DeadlineExceeded) || ctx.Err() != nil {
return nil
}
// TODO: change handling of errors to peers. This problably
// shouldn't be handled as a property of the channel, and
// rather as part of some peer-info/network-management
// interface, but we can do it here for now, to ensure compatibility.
//
// Closing the peer is the same behavior as the legacy system,
// and seems less drastic than blacklisting the peer forever.
return ch.host.Network().ClosePeer(peer.ID(pe.NodeID))
}

View File

@@ -16,13 +16,13 @@ type channelInternal struct {
Error chan PeerError
}
func testChannel(size int) (*channelInternal, *Channel) {
func testChannel(size int) (*channelInternal, *legacyChannel) {
in := &channelInternal{
In: make(chan Envelope, size),
Out: make(chan Envelope, size),
Error: make(chan PeerError, size),
}
ch := &Channel{
ch := &legacyChannel{
inCh: in.In,
outCh: in.Out,
errCh: in.Error,

19
internal/p2p/host.go Normal file
View File

@@ -0,0 +1,19 @@
package p2p
import (
"context"
"errors"
"github.com/libp2p/go-libp2p-core/host"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/tendermint/tendermint/config"
)
// NewHost constructs a default networking connection for a libp2p
// network and returns the top level host object.
func NewHost(conf *config.P2PConfig) (host.Host, error) { return nil, errors.New("not implemented") }
// NewPubSub constructs a pubsub protocol using a libp2p host object.
func NewPubSub(ctx context.Context, conf *config.P2PConfig, host host.Host) (*pubsub.PubSub, error) {
return nil, errors.New("not implemented")
}

View File

@@ -146,8 +146,8 @@ func (n *Network) MakeChannels(
ctx context.Context,
t *testing.T,
chDesc *p2p.ChannelDescriptor,
) map[types.NodeID]*p2p.Channel {
channels := map[types.NodeID]*p2p.Channel{}
) map[types.NodeID]p2p.Channel {
channels := map[types.NodeID]p2p.Channel{}
for _, node := range n.Nodes {
channels[node.NodeID] = node.MakeChannel(ctx, t, chDesc)
}
@@ -161,8 +161,8 @@ func (n *Network) MakeChannelsNoCleanup(
ctx context.Context,
t *testing.T,
chDesc *p2p.ChannelDescriptor,
) map[types.NodeID]*p2p.Channel {
channels := map[types.NodeID]*p2p.Channel{}
) map[types.NodeID]p2p.Channel {
channels := map[types.NodeID]p2p.Channel{}
for _, node := range n.Nodes {
channels[node.NodeID] = node.MakeChannelNoCleanup(ctx, t, chDesc)
}
@@ -267,10 +267,11 @@ func (n *Network) MakeNode(ctx context.Context, t *testing.T, opts NodeOptions)
p2p.NopMetrics(),
privKey,
peerManager,
func() *types.NodeInfo { return &nodeInfo },
transport,
ep,
p2p.RouterOptions{},
p2p.RouterOptions{
NodeInfoProducer: func() *types.NodeInfo { return &nodeInfo },
LegacyTransport: transport,
LegacyEndpoint: ep,
},
)
require.NoError(t, err)
@@ -304,7 +305,7 @@ func (n *Node) MakeChannel(
ctx context.Context,
t *testing.T,
chDesc *p2p.ChannelDescriptor,
) *p2p.Channel {
) p2p.Channel {
ctx, cancel := context.WithCancel(ctx)
channel, err := n.Router.OpenChannel(ctx, chDesc)
require.NoError(t, err)
@@ -321,7 +322,7 @@ func (n *Node) MakeChannelNoCleanup(
ctx context.Context,
t *testing.T,
chDesc *p2p.ChannelDescriptor,
) *p2p.Channel {
) p2p.Channel {
channel, err := n.Router.OpenChannel(ctx, chDesc)
require.NoError(t, err)
return channel

View File

@@ -15,7 +15,7 @@ import (
)
// RequireEmpty requires that the given channel is empty.
func RequireEmpty(ctx context.Context, t *testing.T, channels ...*p2p.Channel) {
func RequireEmpty(ctx context.Context, t *testing.T, channels ...p2p.Channel) {
t.Helper()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
@@ -32,7 +32,7 @@ func RequireEmpty(ctx context.Context, t *testing.T, channels ...*p2p.Channel) {
}
// RequireReceive requires that the given envelope is received on the channel.
func RequireReceive(ctx context.Context, t *testing.T, channel *p2p.Channel, expect p2p.Envelope) {
func RequireReceive(ctx context.Context, t *testing.T, channel p2p.Channel, expect p2p.Envelope) {
t.Helper()
ctx, cancel := context.WithTimeout(ctx, time.Second)
@@ -54,7 +54,7 @@ func RequireReceive(ctx context.Context, t *testing.T, channel *p2p.Channel, exp
// RequireReceiveUnordered requires that the given envelopes are all received on
// the channel, ignoring order.
func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel *p2p.Channel, expect []*p2p.Envelope) {
func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel p2p.Channel, expect []*p2p.Envelope) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
@@ -75,7 +75,7 @@ func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel *p2p.Cha
}
// RequireSend requires that the given envelope is sent on the channel.
func RequireSend(ctx context.Context, t *testing.T, channel *p2p.Channel, envelope p2p.Envelope) {
func RequireSend(ctx context.Context, t *testing.T, channel p2p.Channel, envelope p2p.Envelope) {
tctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
@@ -93,7 +93,7 @@ func RequireSend(ctx context.Context, t *testing.T, channel *p2p.Channel, envelo
func RequireSendReceive(
ctx context.Context,
t *testing.T,
channel *p2p.Channel,
channel p2p.Channel,
peerID types.NodeID,
send proto.Message,
receive proto.Message,
@@ -116,7 +116,7 @@ func RequireNoUpdates(ctx context.Context, t *testing.T, peerUpdates *p2p.PeerUp
}
// RequireError requires that the given peer error is submitted for a peer.
func RequireError(ctx context.Context, t *testing.T, channel *p2p.Channel, peerError p2p.PeerError) {
func RequireError(ctx context.Context, t *testing.T, channel p2p.Channel, peerError p2p.PeerError) {
tctx, tcancel := context.WithTimeout(ctx, time.Second)
defer tcancel()

View File

@@ -145,7 +145,7 @@ func (r *Reactor) OnStop() {}
// processPexCh implements a blocking event loop where we listen for p2p
// Envelope messages from the pexCh.
func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) {
func (r *Reactor) processPexCh(ctx context.Context, pexCh p2p.Channel) {
incoming := make(chan *p2p.Envelope)
go func() {
defer close(incoming)
@@ -192,8 +192,7 @@ func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) {
// A request from another peer, or a response to one of our requests.
dur, err := r.handlePexMessage(ctx, envelope, pexCh)
if err != nil {
r.logger.Error("failed to process message",
"ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
if serr := pexCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
@@ -225,7 +224,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerU
// handlePexMessage handles envelopes sent from peers on the PexChannel.
// If an update was received, a new polling interval is returned; otherwise the
// duration is 0.
func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope, pexCh *p2p.Channel) (time.Duration, error) {
func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope, pexCh p2p.Channel) (time.Duration, error) {
logger := r.logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) {
@@ -308,7 +307,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
// that peer a request for more peer addresses. The chosen peer is moved into
// the requestsSent bucket so that we will not attempt to contact them again
// until they've replied or updated.
func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh *p2p.Channel) error {
func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh p2p.Channel) error {
r.mtx.Lock()
defer r.mtx.Unlock()
if len(r.availablePeers) == 0 {

View File

@@ -275,7 +275,7 @@ type singleTestReactor struct {
pexInCh chan p2p.Envelope
pexOutCh chan p2p.Envelope
pexErrCh chan p2p.PeerError
pexCh *p2p.Channel
pexCh p2p.Channel
peerCh chan p2p.PeerUpdate
manager *p2p.PeerManager
}
@@ -287,8 +287,11 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor {
pexInCh := make(chan p2p.Envelope, chBuf)
pexOutCh := make(chan p2p.Envelope, chBuf)
pexErrCh := make(chan p2p.PeerError, chBuf)
chDesc := pex.ChannelDescriptor()
pexCh := p2p.NewChannel(
p2p.ChannelID(pex.PexChannel),
chDesc.ID,
chDesc.Name,
pexInCh,
pexOutCh,
pexErrCh,
@@ -299,7 +302,7 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor {
peerManager, err := p2p.NewPeerManager(nodeID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err)
chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) {
chCreator := func(context.Context, *p2p.ChannelDescriptor) (p2p.Channel, error) {
return pexCh, nil
}
@@ -324,7 +327,7 @@ type reactorTestSuite struct {
logger log.Logger
reactors map[types.NodeID]*pex.Reactor
pexChannels map[types.NodeID]*p2p.Channel
pexChannels map[types.NodeID]p2p.Channel
peerChans map[types.NodeID]chan p2p.PeerUpdate
peerUpdates map[types.NodeID]*p2p.PeerUpdates
@@ -367,7 +370,7 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT
logger: log.NewNopLogger().With("testCase", t.Name()),
network: p2ptest.MakeNetwork(ctx, t, networkOpts),
reactors: make(map[types.NodeID]*pex.Reactor, realNodes),
pexChannels: make(map[types.NodeID]*p2p.Channel, opts.TotalNodes),
pexChannels: make(map[types.NodeID]p2p.Channel, opts.TotalNodes),
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, opts.TotalNodes),
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, opts.TotalNodes),
total: opts.TotalNodes,
@@ -388,7 +391,7 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], chBuf)
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) {
chCreator := func(context.Context, *p2p.ChannelDescriptor) (p2p.Channel, error) {
return rts.pexChannels[nodeID], nil
}
@@ -448,7 +451,7 @@ func (r *reactorTestSuite) addNodes(ctx context.Context, t *testing.T, nodes int
r.peerUpdates[nodeID] = p2p.NewPeerUpdates(r.peerChans[nodeID], r.opts.BufferSize)
r.network.Nodes[nodeID].PeerManager.Register(ctx, r.peerUpdates[nodeID])
chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) {
chCreator := func(context.Context, *p2p.ChannelDescriptor) (p2p.Channel, error) {
return r.pexChannels[nodeID], nil
}

View File

@@ -11,6 +11,8 @@ import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-core/host"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/libs/log"
@@ -65,6 +67,20 @@ type RouterOptions struct {
// are used to dial peers. This defaults to the value of
// runtime.NumCPU.
NumConcurrentDials func() int
// NodeInfoProducer returns a reference to the current
// NodeInfo object for use in adding channels.
NodeInfoProducer func() *types.NodeInfo
// UseLibP2P toggles the use of the new networking layer
// within the router.
UseLibP2P bool
LegacyTransport Transport
LegacyEndpoint *Endpoint
NetworkHost host.Host
NetworkPubSub *pubsub.PubSub
}
const (
@@ -84,6 +100,38 @@ func (o *RouterOptions) Validate() error {
return fmt.Errorf("queue type %q is not supported", o.QueueType)
}
if o.NodeInfoProducer == nil {
return errors.New("must specify a NodeInfoProducer")
}
if o.UseLibP2P {
if o.LegacyTransport != nil {
return errors.New("when using libp2p you must not specify legacy components (transport)")
}
if o.LegacyEndpoint != nil {
return errors.New("when using libp2p you must not specify legacy components (endpoint)")
}
if o.NetworkHost == nil {
return errors.New("when using libp2p you must specify network components (host)")
}
if o.NetworkPubSub == nil {
return errors.New("when using libp2p you must specify network components (pubsub)")
}
} else {
if o.LegacyTransport == nil {
return errors.New("when using legacy p2p you must specify a transport")
}
if o.LegacyEndpoint == nil {
return errors.New("when using legacy p2p you must specify an endpoint")
}
if o.NetworkHost != nil {
return errors.New("when using legacy p2p you must not specify libp2p components (host)")
}
if o.NetworkPubSub != nil {
return errors.New("when using legacy p2p you must not specify libp2p components (pubsub)")
}
}
switch {
case o.IncomingConnectionWindow == 0:
o.IncomingConnectionWindow = 100 * time.Millisecond
@@ -140,76 +188,87 @@ func (o *RouterOptions) Validate() error {
// quality of service.
type Router struct {
*service.BaseService
logger log.Logger
logger log.Logger
metrics *Metrics
lc *metricsLabelCache
options RouterOptions
privKey crypto.PrivKey
peerManager *PeerManager
chDescs []*ChannelDescriptor
transport Transport
endpoint *Endpoint
connTracker connectionTracker
options RouterOptions
privKey crypto.PrivKey
chDescs []*ChannelDescriptor
peerMtx sync.RWMutex
peerQueues map[types.NodeID]queue // outbound messages per peer for all channels
// the channels that the peer queue has open
peerChannels map[types.NodeID]ChannelIDSet
queueFactory func(int) queue
nodeInfoProducer func() *types.NodeInfo
chainID string
// FIXME: We don't strictly need to use a mutex for this if we seal the
// channels on router start. This depends on whether we want to allow
// dynamic channels in the future.
channelMtx sync.RWMutex
channelQueues map[ChannelID]queue // inbound messages from all peers to a single channel
channelMessages map[ChannelID]proto.Message
legacy struct {
peerManager *PeerManager
transport Transport
endpoint *Endpoint
connTracker connectionTracker
peerMtx sync.RWMutex
peerQueues map[types.NodeID]queue // outbound messages per peer for all channels
// the channels that the peer queue has open
peerChannels map[types.NodeID]ChannelIDSet
queueFactory func(int) queue
// FIXME: We don't strictly need to use a mutex for this if we seal the
// channels on router start. This depends on whether we want to allow
// dynamic channels in the future.
channelMtx sync.RWMutex
channelQueues map[ChannelID]queue // inbound messages from all peers to a single channel
channelMessages map[ChannelID]proto.Message
}
network struct {
host host.Host // network handle for ourselves
ps *pubsub.PubSub
mtx sync.Mutex
channels map[string]Channel
}
}
// NewRouter creates a new Router. The given Transports must already be
// listening on appropriate interfaces, and will be closed by the Router when it
// stops.
func NewRouter(
logger log.Logger,
metrics *Metrics,
privKey crypto.PrivKey,
peerManager *PeerManager,
nodeInfoProducer func() *types.NodeInfo,
transport Transport,
endpoint *Endpoint,
options RouterOptions,
) (*Router, error) {
if err := options.Validate(); err != nil {
func NewRouter(logger log.Logger, metrics *Metrics, key crypto.PrivKey, pm *PeerManager, opts RouterOptions) (*Router, error) {
if err := opts.Validate(); err != nil {
return nil, err
}
router := &Router{
logger: logger,
metrics: metrics,
lc: newMetricsLabelCache(),
privKey: privKey,
nodeInfoProducer: nodeInfoProducer,
connTracker: newConnTracker(
options.MaxIncomingConnectionAttempts,
options.IncomingConnectionWindow,
),
chDescs: make([]*ChannelDescriptor, 0),
transport: transport,
endpoint: endpoint,
peerManager: peerManager,
options: options,
channelQueues: map[ChannelID]queue{},
channelMessages: map[ChannelID]proto.Message{},
peerQueues: map[types.NodeID]queue{},
peerChannels: make(map[types.NodeID]ChannelIDSet),
logger: logger,
metrics: metrics,
lc: newMetricsLabelCache(),
privKey: key,
nodeInfoProducer: opts.NodeInfoProducer,
options: opts,
chDescs: make([]*ChannelDescriptor, 0),
}
router.BaseService = service.NewBaseService(logger, "router", router)
return router, nil
switch {
case opts.UseLibP2P:
router.network.host = opts.NetworkHost
router.options.NetworkPubSub = opts.NetworkPubSub
return nil, errors.New("libp2p is not (yet) supported")
default:
router.legacy.connTracker = newConnTracker(
opts.MaxIncomingConnectionAttempts,
opts.IncomingConnectionWindow,
)
router.legacy.transport = opts.LegacyTransport
router.legacy.endpoint = opts.LegacyEndpoint
router.legacy.peerManager = pm
router.legacy.channelQueues = map[ChannelID]queue{}
router.legacy.channelMessages = map[ChannelID]proto.Message{}
router.legacy.peerQueues = map[types.NodeID]queue{}
router.legacy.peerChannels = make(map[types.NodeID]ChannelIDSet)
return router, nil
}
}
func (r *Router) createQueueFactory(ctx context.Context) (func(int) queue, error) {
@@ -239,7 +298,7 @@ func (r *Router) createQueueFactory(ctx context.Context) (func(int) queue, error
// ChannelCreator allows routers to construct their own channels,
// either by receiving a reference to Router.OpenChannel or using some
// kind shim for testing purposes.
type ChannelCreator func(context.Context, *ChannelDescriptor) (*Channel, error)
type ChannelCreator func(context.Context, *ChannelDescriptor) (Channel, error)
// OpenChannel opens a new channel for the given message type. The caller must
// close the channel when done, before stopping the Router. messageType is the
@@ -247,50 +306,74 @@ type ChannelCreator func(context.Context, *ChannelDescriptor) (*Channel, error)
// implement Wrapper to automatically (un)wrap multiple message types in a
// wrapper message. The caller may provide a size to make the channel buffered,
// which internally makes the inbound, outbound, and error channel buffered.
func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*Channel, error) {
r.channelMtx.Lock()
defer r.channelMtx.Unlock()
func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (Channel, error) {
switch {
case r.options.UseLibP2P:
info := r.nodeInfoProducer()
ch, err := NewLibP2PChannel(ctx, info.Network, chDesc, r.options.NetworkPubSub, r.options.NetworkHost)
if err != nil {
return nil, err
}
id := chDesc.ID
if _, ok := r.channelQueues[id]; ok {
return nil, fmt.Errorf("channel %v already exists", id)
}
r.chDescs = append(r.chDescs, chDesc)
// TODO(tychoish): might be nice (though ultimately
// not particularly impactful(?)) to be able to get the
// canonical name for the channel without constructing
// it.
messageType := chDesc.MessageType
name := ch.String()
r.network.mtx.Lock()
defer r.network.mtx.Unlock()
if _, ok := r.network.channels[name]; ok {
// TODO(tychoish) actually maybe it would be ok to just
// return the existing channel.
return nil, fmt.Errorf("cannot construct channel %q more than once", name)
}
r.network.channels[name] = ch
queue := r.queueFactory(chDesc.RecvBufferCapacity)
outCh := make(chan Envelope, chDesc.RecvBufferCapacity)
errCh := make(chan PeerError, chDesc.RecvBufferCapacity)
channel := NewChannel(id, queue.dequeue(), outCh, errCh)
channel.name = chDesc.Name
return ch, nil
default:
r.legacy.channelMtx.Lock()
defer r.legacy.channelMtx.Unlock()
var wrapper Wrapper
if w, ok := messageType.(Wrapper); ok {
wrapper = w
}
if _, ok := r.legacy.channelQueues[chDesc.ID]; ok {
return nil, fmt.Errorf("channel %v already exists", chDesc.ID)
}
r.chDescs = append(r.chDescs, chDesc)
r.channelQueues[id] = queue
r.channelMessages[id] = messageType
messageType := chDesc.MessageType
// add the channel to the nodeInfo if it's not already there.
r.nodeInfoProducer().AddChannel(uint16(chDesc.ID))
queue := r.legacy.queueFactory(chDesc.RecvBufferCapacity)
outCh := make(chan Envelope, chDesc.RecvBufferCapacity)
errCh := make(chan PeerError, chDesc.RecvBufferCapacity)
channel := NewChannel(chDesc.ID, chDesc.Name, queue.dequeue(), outCh, errCh)
r.transport.AddChannelDescriptors([]*ChannelDescriptor{chDesc})
var wrapper Wrapper
if w, ok := chDesc.MessageType.(Wrapper); ok {
wrapper = w
}
go func() {
defer func() {
r.channelMtx.Lock()
delete(r.channelQueues, id)
delete(r.channelMessages, id)
r.channelMtx.Unlock()
queue.close()
r.legacy.channelQueues[chDesc.ID] = queue
r.legacy.channelMessages[chDesc.ID] = messageType
// add the channel to the nodeInfo if it's not already there.
r.nodeInfoProducer().AddChannel(uint16(chDesc.ID))
r.legacy.transport.AddChannelDescriptors([]*ChannelDescriptor{chDesc})
go func() {
defer func() {
r.legacy.channelMtx.Lock()
delete(r.legacy.channelQueues, chDesc.ID)
delete(r.legacy.channelMessages, chDesc.ID)
r.legacy.channelMtx.Unlock()
queue.close()
}()
r.routeChannel(ctx, chDesc.ID, outCh, errCh, wrapper)
}()
r.routeChannel(ctx, id, outCh, errCh, wrapper)
}()
return channel, nil
return channel, nil
}
}
// routeChannel receives outbound channel messages and routes them to the
@@ -329,11 +412,11 @@ func (r *Router) routeChannel(
// collect peer queues to pass the message via
var queues []queue
if envelope.Broadcast {
r.peerMtx.RLock()
r.legacy.peerMtx.RLock()
queues = make([]queue, 0, len(r.peerQueues))
for nodeID, q := range r.peerQueues {
peerChs := r.peerChannels[nodeID]
queues = make([]queue, 0, len(r.legacy.peerQueues))
for nodeID, q := range r.legacy.peerQueues {
peerChs := r.legacy.peerChannels[nodeID]
// check whether the peer is receiving on that channel
if _, ok := peerChs[chID]; ok {
@@ -341,19 +424,19 @@ func (r *Router) routeChannel(
}
}
r.peerMtx.RUnlock()
r.legacy.peerMtx.RUnlock()
} else {
r.peerMtx.RLock()
r.legacy.peerMtx.RLock()
q, ok := r.peerQueues[envelope.To]
q, ok := r.legacy.peerQueues[envelope.To]
contains := false
if ok {
peerChs := r.peerChannels[envelope.To]
peerChs := r.legacy.peerChannels[envelope.To]
// check whether the peer is receiving on that channel
_, contains = peerChs[chID]
}
r.peerMtx.RUnlock()
r.legacy.peerMtx.RUnlock()
if !ok {
r.logger.Debug("dropping message for unconnected peer", "peer", envelope.To, "channel", chID)
@@ -387,8 +470,13 @@ func (r *Router) routeChannel(
}
}
case peerError := <-errCh:
maxPeerCapacity := r.peerManager.HasMaxPeerCapacity()
case peerError, ok := <-errCh:
if !ok {
return
}
maxPeerCapacity := r.legacy.peerManager.HasMaxPeerCapacity()
r.logger.Error("peer error",
"peer", peerError.NodeID,
"err", peerError.Err,
@@ -399,16 +487,15 @@ func (r *Router) routeChannel(
// if the error is fatal or all peer
// slots are in use, we can error
// (disconnect) from the peer.
r.peerManager.Errored(peerError.NodeID, peerError.Err)
r.legacy.peerManager.Errored(peerError.NodeID, peerError.Err)
} else {
// this just decrements the peer
// score.
r.peerManager.processPeerEvent(ctx, PeerUpdate{
r.legacy.peerManager.processPeerEvent(ctx, PeerUpdate{
NodeID: peerError.NodeID,
Status: PeerStatusBad,
})
}
case <-ctx.Done():
return
}
@@ -455,10 +542,12 @@ func (r *Router) acceptPeers(ctx context.Context, transport Transport) {
// in this case we got an error from the net.Listener.
r.logger.Error("failed to accept connection", "transport", transport, "err", err)
continue
case conn == nil:
continue
}
incomingIP := conn.RemoteEndpoint().IP
if err := r.connTracker.AddConn(incomingIP); err != nil {
if err := r.legacy.connTracker.AddConn(incomingIP); err != nil {
closeErr := conn.Close()
r.logger.Debug("rate limiting incoming peer",
"err", err,
@@ -477,7 +566,7 @@ func (r *Router) acceptPeers(ctx context.Context, transport Transport) {
func (r *Router) openConnection(ctx context.Context, conn Connection) {
defer conn.Close()
defer r.connTracker.RemoveConn(conn.RemoteEndpoint().IP)
defer r.legacy.connTracker.RemoveConn(conn.RemoteEndpoint().IP)
re := conn.RemoteEndpoint()
incomingIP := re.IP
@@ -515,7 +604,7 @@ func (r *Router) openConnection(ctx context.Context, conn Connection) {
return
}
if err := r.runWithPeerMutex(func() error { return r.peerManager.Accepted(peerInfo.NodeID) }); err != nil {
if err := r.runWithPeerMutex(func() error { return r.legacy.peerManager.Accepted(peerInfo.NodeID) }); err != nil {
r.logger.Error("failed to accept connection",
"op", "incoming/accepted", "peer", peerInfo.NodeID, "err", err)
return
@@ -553,7 +642,7 @@ func (r *Router) dialPeers(ctx context.Context) {
LOOP:
for {
address, err := r.peerManager.DialNext(ctx)
address, err := r.legacy.peerManager.DialNext(ctx)
switch {
case errors.Is(err, context.Canceled):
break LOOP
@@ -580,7 +669,7 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) {
return
case err != nil:
r.logger.Debug("failed to dial peer", "peer", address, "err", err)
if err = r.peerManager.DialFailed(ctx, address); err != nil {
if err = r.legacy.peerManager.DialFailed(ctx, address); err != nil {
r.logger.Error("failed to report dial failure", "peer", address, "err", err)
}
return
@@ -593,16 +682,16 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) {
return
case err != nil:
r.logger.Error("failed to handshake with peer", "peer", address, "err", err)
if err = r.peerManager.DialFailed(ctx, address); err != nil {
if err = r.legacy.peerManager.DialFailed(ctx, address); err != nil {
r.logger.Error("failed to report dial failure", "peer", address, "err", err)
}
conn.Close()
return
}
if err := r.runWithPeerMutex(func() error { return r.peerManager.Dialed(address) }); err != nil {
if err := r.runWithPeerMutex(func() error { return r.legacy.peerManager.Dialed(address) }); err != nil {
r.logger.Error("failed to dial peer", "op", "outgoing/dialing", "peer", address.NodeID, "err", err)
r.peerManager.dialWaker.Wake()
r.legacy.peerManager.dialWaker.Wake()
conn.Close()
return
}
@@ -612,16 +701,16 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) {
}
func (r *Router) getOrMakeQueue(peerID types.NodeID, channels ChannelIDSet) queue {
r.peerMtx.Lock()
defer r.peerMtx.Unlock()
r.legacy.peerMtx.Lock()
defer r.legacy.peerMtx.Unlock()
if peerQueue, ok := r.peerQueues[peerID]; ok {
if peerQueue, ok := r.legacy.peerQueues[peerID]; ok {
return peerQueue
}
peerQueue := r.queueFactory(queueBufferDefault)
r.peerQueues[peerID] = peerQueue
r.peerChannels[peerID] = channels
peerQueue := r.legacy.queueFactory(queueBufferDefault)
r.legacy.peerQueues[peerID] = peerQueue
r.legacy.peerChannels[peerID] = channels
return peerQueue
}
@@ -658,7 +747,7 @@ func (r *Router) dialPeer(ctx context.Context, address NodeAddress) (Connection,
// by the peer's endpoint, since e.g. a peer on 192.168.0.0 can reach us
// on a private address on this endpoint, but a peer on the public
// Internet can't and needs a different public address.
conn, err := r.transport.Dial(dialCtx, endpoint)
conn, err := r.legacy.transport.Dial(dialCtx, endpoint)
if err != nil {
r.logger.Debug("failed to dial endpoint", "peer", address.NodeID, "endpoint", endpoint, "err", err)
} else {
@@ -697,7 +786,7 @@ func (r *Router) handshakePeer(
}
if err := nodeInfo.CompatibleWith(peerInfo); err != nil {
if err := r.peerManager.Inactivate(peerInfo.NodeID); err != nil {
if err := r.legacy.peerManager.Inactivate(peerInfo.NodeID); err != nil {
return peerInfo, fmt.Errorf("problem inactivating peer %q: %w", peerInfo.ID(), err)
}
@@ -711,8 +800,8 @@ func (r *Router) handshakePeer(
}
func (r *Router) runWithPeerMutex(fn func() error) error {
r.peerMtx.Lock()
defer r.peerMtx.Unlock()
r.legacy.peerMtx.Lock()
defer r.legacy.peerMtx.Unlock()
return fn()
}
@@ -721,18 +810,18 @@ func (r *Router) runWithPeerMutex(fn func() error) error {
// they are closed elsewhere it will cause this method to shut down and return.
func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connection, channels ChannelIDSet) {
r.metrics.PeersConnected.Add(1)
r.peerManager.Ready(ctx, peerID, channels)
r.legacy.peerManager.Ready(ctx, peerID, channels)
sendQueue := r.getOrMakeQueue(peerID, channels)
defer func() {
r.peerMtx.Lock()
delete(r.peerQueues, peerID)
delete(r.peerChannels, peerID)
r.peerMtx.Unlock()
r.legacy.peerMtx.Lock()
delete(r.legacy.peerQueues, peerID)
delete(r.legacy.peerChannels, peerID)
r.legacy.peerMtx.Unlock()
sendQueue.close()
r.peerManager.Disconnected(ctx, peerID)
r.legacy.peerManager.Disconnected(ctx, peerID)
r.metrics.PeersConnected.Add(-1)
}()
@@ -795,10 +884,10 @@ func (r *Router) receivePeer(ctx context.Context, peerID types.NodeID, conn Conn
return err
}
r.channelMtx.RLock()
queue, ok := r.channelQueues[chID]
messageType := r.channelMessages[chID]
r.channelMtx.RUnlock()
r.legacy.channelMtx.RLock()
queue, ok := r.legacy.channelQueues[chID]
messageType := r.legacy.channelMessages[chID]
r.legacy.channelMtx.RUnlock()
if !ok {
r.logger.Debug("dropping message for unknown channel", "peer", peerID, "channel", chID)
@@ -876,7 +965,7 @@ func (r *Router) sendPeer(ctx context.Context, peerID types.NodeID, conn Connect
// evictPeers evicts connected peers as requested by the peer manager.
func (r *Router) evictPeers(ctx context.Context) {
for {
peerID, err := r.peerManager.EvictNext(ctx)
peerID, err := r.legacy.peerManager.EvictNext(ctx)
switch {
case errors.Is(err, context.Canceled):
@@ -888,9 +977,9 @@ func (r *Router) evictPeers(ctx context.Context) {
r.logger.Info("evicting peer", "peer", peerID)
r.peerMtx.RLock()
queue, ok := r.peerQueues[peerID]
r.peerMtx.RUnlock()
r.legacy.peerMtx.RLock()
queue, ok := r.legacy.peerQueues[peerID]
r.legacy.peerMtx.RUnlock()
r.metrics.PeersEvicted.Add(1)
@@ -906,23 +995,29 @@ func (r *Router) setupQueueFactory(ctx context.Context) error {
return err
}
r.queueFactory = qf
r.legacy.queueFactory = qf
return nil
}
// OnStart implements service.Service.
func (r *Router) OnStart(ctx context.Context) error {
if r.options.UseLibP2P {
return nil
}
r.chainID = r.nodeInfoProducer().Network
if err := r.setupQueueFactory(ctx); err != nil {
return err
}
if err := r.transport.Listen(r.endpoint); err != nil {
if err := r.legacy.transport.Listen(r.legacy.endpoint); err != nil {
return err
}
go r.dialPeers(ctx)
go r.evictPeers(ctx)
go r.acceptPeers(ctx, r.transport)
go r.acceptPeers(ctx, r.legacy.transport)
return nil
}
@@ -934,25 +1029,38 @@ func (r *Router) OnStart(ctx context.Context) error {
// here, since that would cause any reactor senders to panic, so it is the
// sender's responsibility.
func (r *Router) OnStop() {
if r.options.UseLibP2P {
for name, ch := range r.network.channels {
if err := ch.Err(); err != nil {
r.logger.Error("shutting down channel",
"name", name,
"err", ch.Err(),
)
}
}
return
}
// Close transport listeners (unblocks Accept calls).
if err := r.transport.Close(); err != nil {
if err := r.legacy.transport.Close(); err != nil {
r.logger.Error("failed to close transport", "err", err)
}
// Collect all remaining queues, and wait for them to close.
queues := []queue{}
r.channelMtx.RLock()
for _, q := range r.channelQueues {
r.legacy.channelMtx.RLock()
for _, q := range r.legacy.channelQueues {
queues = append(queues, q)
}
r.channelMtx.RUnlock()
r.legacy.channelMtx.RUnlock()
r.peerMtx.RLock()
for _, q := range r.peerQueues {
r.legacy.peerMtx.RLock()
for _, q := range r.legacy.peerQueues {
queues = append(queues, q)
}
r.peerMtx.RUnlock()
r.legacy.peerMtx.RUnlock()
for _, q := range queues {
q.close()

View File

@@ -19,8 +19,7 @@ func TestConnectionFiltering(t *testing.T) {
filterByIPCount := 0
router := &Router{
logger: logger,
connTracker: newConnTracker(1, time.Second),
logger: logger,
options: RouterOptions{
FilterPeerByIP: func(ctx context.Context, ip net.IP, port uint16) error {
filterByIPCount++
@@ -28,6 +27,8 @@ func TestConnectionFiltering(t *testing.T) {
},
},
}
router.legacy.connTracker = newConnTracker(1, time.Second)
require.Equal(t, 0, filterByIPCount)
router.openConnection(ctx, &MemoryConnection{logger: logger, closeFn: func() {}})
require.Equal(t, 1, filterByIPCount)

View File

@@ -11,47 +11,57 @@ import (
"github.com/tendermint/tendermint/types"
)
func getDefaultRouterOptions() RouterOptions {
return RouterOptions{
LegacyTransport: &MemoryTransport{},
LegacyEndpoint: &Endpoint{},
NodeInfoProducer: func() *types.NodeInfo { return &types.NodeInfo{} },
}
}
func TestRouter_ConstructQueueFactory(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Run("ValidateOptionsPopulatesDefaultQueue", func(t *testing.T) {
opts := RouterOptions{}
opts := getDefaultRouterOptions()
require.NoError(t, opts.Validate())
require.Equal(t, "fifo", opts.QueueType)
})
t.Run("Default", func(t *testing.T) {
require.Zero(t, os.Getenv("TM_P2P_QUEUE"))
opts := RouterOptions{}
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts)
opts := getDefaultRouterOptions()
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, opts)
require.NoError(t, err)
require.NoError(t, r.setupQueueFactory(ctx))
_, ok := r.queueFactory(1).(*fifoQueue)
_, ok := r.legacy.queueFactory(1).(*fifoQueue)
require.True(t, ok)
})
t.Run("Fifo", func(t *testing.T) {
opts := RouterOptions{QueueType: queueTypeFifo}
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts)
opts := getDefaultRouterOptions()
opts.QueueType = queueTypeFifo
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, opts)
require.NoError(t, err)
require.NoError(t, r.setupQueueFactory(ctx))
_, ok := r.queueFactory(1).(*fifoQueue)
_, ok := r.legacy.queueFactory(1).(*fifoQueue)
require.True(t, ok)
})
t.Run("Priority", func(t *testing.T) {
opts := RouterOptions{QueueType: queueTypePriority}
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts)
opts := getDefaultRouterOptions()
opts.QueueType = queueTypePriority
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, opts)
require.NoError(t, err)
require.NoError(t, r.setupQueueFactory(ctx))
q, ok := r.queueFactory(1).(*pqScheduler)
q, ok := r.legacy.queueFactory(1).(*pqScheduler)
require.True(t, ok)
defer q.close()
})
t.Run("NonExistant", func(t *testing.T) {
opts := RouterOptions{QueueType: "fast"}
_, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts)
opts := getDefaultRouterOptions()
opts.QueueType = "fast"
_, err := NewRouter(log.NewNopLogger(), nil, nil, nil, opts)
require.Error(t, err)
require.Contains(t, err.Error(), "fast")
})

View File

@@ -14,6 +14,8 @@ import (
"github.com/fortytw2/leaktest"
"github.com/gogo/protobuf/proto"
gogotypes "github.com/gogo/protobuf/types"
pubsub "github.com/libp2p/go-libp2p-pubsub"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
@@ -26,7 +28,70 @@ import (
"github.com/tendermint/tendermint/types"
)
func echoReactor(ctx context.Context, channel *p2p.Channel) {
func TestRouterConstruction(t *testing.T) {
t.Run("Legacy", func(t *testing.T) {
opts := p2p.RouterOptions{
UseLibP2P: false,
LegacyEndpoint: &p2p.Endpoint{},
LegacyTransport: &p2p.MemoryTransport{},
NodeInfoProducer: func() *types.NodeInfo { return &types.NodeInfo{} },
}
if err := opts.Validate(); err != nil {
t.Fatalf("options should validate: %v", err)
}
logger := log.NewNopLogger()
metrics := p2p.NopMetrics()
router, err := p2p.NewRouter(
logger,
metrics,
nil, // privkey
nil, // peermanager
opts,
)
if err != nil {
t.Fatal("problem constructing legacy router", err)
}
if router == nil {
t.Error("router was not constructed when it should not have been")
}
})
t.Run("LibP2P", func(t *testing.T) {
opts := p2p.RouterOptions{
UseLibP2P: true,
LegacyEndpoint: nil,
LegacyTransport: nil,
NodeInfoProducer: func() *types.NodeInfo { return &types.NodeInfo{} },
NetworkHost: &basichost.BasicHost{},
NetworkPubSub: &pubsub.PubSub{},
}
if err := opts.Validate(); err != nil {
t.Fatalf("options should validate: %v", err)
}
logger := log.NewNopLogger()
metrics := p2p.NopMetrics()
router, err := p2p.NewRouter(
logger,
metrics,
nil, // privkey
nil, // peermanager
opts,
)
if err == nil {
t.Error("support for libp2p does not exist, and should prevent the router from being constructed")
} else if err.Error() != "libp2p is not (yet) supported" {
t.Errorf("incorrect error: %q", err.Error())
}
if router != nil {
t.Error("router was constructed and should not have have been")
}
})
}
func echoReactor(ctx context.Context, channel p2p.Channel) {
iter := channel.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
@@ -112,10 +177,11 @@ func TestRouter_Channel_Basic(t *testing.T) {
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
testnet.RandomNode().Transport,
&p2p.Endpoint{},
p2p.RouterOptions{},
p2p.RouterOptions{
LegacyTransport: testnet.RandomNode().Transport,
LegacyEndpoint: &p2p.Endpoint{},
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
},
)
require.NoError(t, err)
@@ -411,10 +477,11 @@ func TestRouter_AcceptPeers(t *testing.T) {
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
mockTransport,
nil,
p2p.RouterOptions{},
p2p.RouterOptions{
LegacyTransport: mockTransport,
LegacyEndpoint: &p2p.Endpoint{},
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
},
)
require.NoError(t, err)
require.NoError(t, router.Start(ctx))
@@ -451,27 +518,27 @@ func TestRouter_AcceptPeers_Errors(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err)
// Set up a mock transport that returns io.EOF once, which should prevent
// the router from calling Accept again.
mockTransport := &mocks.Transport{}
mockTransport.On("String").Maybe().Return("mock")
mockTransport.On("Accept", mock.Anything).Once().Return(nil, err)
mockTransport.On("Accept", mock.Anything).Once().Return(nil, io.EOF)
mockTransport.On("Close").Return(nil)
mockTransport.On("Listen", mock.Anything).Return(nil)
// Set up and start the router.
peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err)
router, err := p2p.NewRouter(
log.NewNopLogger(),
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
mockTransport,
nil,
p2p.RouterOptions{},
p2p.RouterOptions{
LegacyTransport: mockTransport,
LegacyEndpoint: &p2p.Endpoint{},
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
},
)
require.NoError(t, err)
@@ -523,10 +590,11 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) {
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
mockTransport,
nil,
p2p.RouterOptions{},
p2p.RouterOptions{
LegacyTransport: mockTransport,
LegacyEndpoint: &p2p.Endpoint{},
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
},
)
require.NoError(t, err)
require.NoError(t, router.Start(ctx))
@@ -626,10 +694,11 @@ func TestRouter_DialPeers(t *testing.T) {
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
mockTransport,
nil,
p2p.RouterOptions{},
p2p.RouterOptions{
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
LegacyTransport: mockTransport,
LegacyEndpoint: &p2p.Endpoint{},
},
)
require.NoError(t, err)
require.NoError(t, router.Start(ctx))
@@ -711,10 +780,9 @@ func TestRouter_DialPeers_Parallel(t *testing.T) {
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
mockTransport,
nil,
p2p.RouterOptions{
LegacyTransport: mockTransport,
LegacyEndpoint: &p2p.Endpoint{},
NumConcurrentDials: func() int {
ncpu := runtime.NumCPU()
if ncpu <= 3 {
@@ -784,10 +852,11 @@ func TestRouter_EvictPeers(t *testing.T) {
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
mockTransport,
nil,
p2p.RouterOptions{},
p2p.RouterOptions{
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
LegacyTransport: mockTransport,
LegacyEndpoint: &p2p.Endpoint{},
},
)
require.NoError(t, err)
require.NoError(t, router.Start(ctx))
@@ -846,10 +915,11 @@ func TestRouter_ChannelCompatability(t *testing.T) {
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
mockTransport,
nil,
p2p.RouterOptions{},
p2p.RouterOptions{
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
LegacyTransport: mockTransport,
LegacyEndpoint: &p2p.Endpoint{},
},
)
require.NoError(t, err)
require.NoError(t, router.Start(ctx))
@@ -901,10 +971,11 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) {
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
mockTransport,
nil,
p2p.RouterOptions{},
p2p.RouterOptions{
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
LegacyTransport: mockTransport,
LegacyEndpoint: &p2p.Endpoint{},
},
)
require.NoError(t, err)
require.NoError(t, router.Start(ctx))

View File

@@ -26,14 +26,14 @@ var (
// NOTE: It is not the responsibility of the dispatcher to verify the light blocks.
type Dispatcher struct {
// the channel with which to send light block requests on
requestCh *p2p.Channel
requestCh p2p.Channel
mtx sync.Mutex
// all pending calls that have been dispatched and are awaiting an answer
calls map[types.NodeID]chan *types.LightBlock
}
func NewDispatcher(requestChannel *p2p.Channel) *Dispatcher {
func NewDispatcher(requestChannel p2p.Channel) *Dispatcher {
return &Dispatcher{
requestCh: requestChannel,
calls: make(map[types.NodeID]chan *types.LightBlock),

View File

@@ -24,13 +24,13 @@ type channelInternal struct {
Error chan p2p.PeerError
}
func testChannel(size int) (*channelInternal, *p2p.Channel) {
func testChannel(size int) (*channelInternal, p2p.Channel) {
in := &channelInternal{
In: make(chan p2p.Envelope, size),
Out: make(chan p2p.Envelope, size),
Error: make(chan p2p.PeerError, size),
}
return in, p2p.NewChannel(0, in.In, in.Out, in.Error)
return in, p2p.NewChannel(0, "test", in.In, in.Out, in.Error)
}
func TestDispatcherBasic(t *testing.T) {

View File

@@ -305,7 +305,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
return nil
}
go r.processChannels(ctx, map[p2p.ChannelID]*p2p.Channel{
go r.processChannels(ctx, map[p2p.ChannelID]p2p.Channel{
SnapshotChannel: snapshotCh,
ChunkChannel: chunkCh,
LightBlockChannel: blockCh,
@@ -611,7 +611,7 @@ func (r *Reactor) backfill(
// handleSnapshotMessage handles envelopes sent from peers on the
// SnapshotChannel. It returns an error only if the Envelope.Message is unknown
// for this channel. This should never be called outside of handleMessage.
func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope, snapshotCh *p2p.Channel) error {
func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope, snapshotCh p2p.Channel) error {
logger := r.logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) {
@@ -683,7 +683,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel
// handleChunkMessage handles envelopes sent from peers on the ChunkChannel.
// It returns an error only if the Envelope.Message is unknown for this channel.
// This should never be called outside of handleMessage.
func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope, chunkCh *p2p.Channel) error {
func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope, chunkCh p2p.Channel) error {
switch msg := envelope.Message.(type) {
case *ssproto.ChunkRequest:
r.logger.Debug(
@@ -772,7 +772,7 @@ func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope
return nil
}
func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope, blockCh *p2p.Channel) error {
func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope, blockCh p2p.Channel) error {
switch msg := envelope.Message.(type) {
case *ssproto.LightBlockRequest:
r.logger.Info("received light block request", "height", msg.Height)
@@ -829,7 +829,7 @@ func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Env
return nil
}
func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope, paramsCh *p2p.Channel) error {
func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope, paramsCh p2p.Channel) error {
switch msg := envelope.Message.(type) {
case *ssproto.ParamsRequest:
r.logger.Debug("received consensus params request", "height", msg.Height)
@@ -878,7 +878,7 @@ func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelop
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, chans map[p2p.ChannelID]*p2p.Channel) (err error) {
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, chans map[p2p.ChannelID]p2p.Channel) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e)
@@ -912,12 +912,12 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, cha
// encountered during message execution will result in a PeerError being sent on
// the respective channel. When the reactor is stopped, we will catch the signal
// and close the p2p Channel gracefully.
func (r *Reactor) processChannels(ctx context.Context, chanTable map[p2p.ChannelID]*p2p.Channel) {
// make sure that the iterator gets cleaned up in case of error
func (r *Reactor) processChannels(ctx context.Context, chanTable map[p2p.ChannelID]p2p.Channel) {
// make sure tht the iterator gets cleaned up in case of error
ctx, cancel := context.WithCancel(ctx)
defer cancel()
chs := make([]*p2p.Channel, 0, len(chanTable))
chs := make([]p2p.Channel, 0, len(chanTable))
for key := range chanTable {
chs = append(chs, chanTable[key])
}

View File

@@ -40,22 +40,22 @@ type reactorTestSuite struct {
conn *clientmocks.Client
stateProvider *mocks.StateProvider
snapshotChannel *p2p.Channel
snapshotChannel p2p.Channel
snapshotInCh chan p2p.Envelope
snapshotOutCh chan p2p.Envelope
snapshotPeerErrCh chan p2p.PeerError
chunkChannel *p2p.Channel
chunkChannel p2p.Channel
chunkInCh chan p2p.Envelope
chunkOutCh chan p2p.Envelope
chunkPeerErrCh chan p2p.PeerError
blockChannel *p2p.Channel
blockChannel p2p.Channel
blockInCh chan p2p.Envelope
blockOutCh chan p2p.Envelope
blockPeerErrCh chan p2p.PeerError
paramsChannel *p2p.Channel
paramsChannel p2p.Channel
paramsInCh chan p2p.Envelope
paramsOutCh chan p2p.Envelope
paramsPeerErrCh chan p2p.PeerError
@@ -102,6 +102,7 @@ func setup(
rts.snapshotChannel = p2p.NewChannel(
SnapshotChannel,
"snapshot",
rts.snapshotInCh,
rts.snapshotOutCh,
rts.snapshotPeerErrCh,
@@ -109,6 +110,7 @@ func setup(
rts.chunkChannel = p2p.NewChannel(
ChunkChannel,
"chunk",
rts.chunkInCh,
rts.chunkOutCh,
rts.chunkPeerErrCh,
@@ -116,6 +118,7 @@ func setup(
rts.blockChannel = p2p.NewChannel(
LightBlockChannel,
"lightblock",
rts.blockInCh,
rts.blockOutCh,
rts.blockPeerErrCh,
@@ -123,6 +126,7 @@ func setup(
rts.paramsChannel = p2p.NewChannel(
ParamsChannel,
"params",
rts.paramsInCh,
rts.paramsOutCh,
rts.paramsPeerErrCh,
@@ -133,7 +137,7 @@ func setup(
cfg := config.DefaultStateSyncConfig()
chCreator := func(ctx context.Context, desc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
chCreator := func(ctx context.Context, desc *p2p.ChannelDescriptor) (p2p.Channel, error) {
switch desc.ID {
case SnapshotChannel:
return rts.snapshotChannel, nil

View File

@@ -208,7 +208,7 @@ type stateProviderP2P struct {
sync.Mutex // light.Client is not concurrency-safe
lc *light.Client
initialHeight int64
paramsSendCh *p2p.Channel
paramsSendCh p2p.Channel
paramsRecvCh chan types.ConsensusParams
}
@@ -220,7 +220,7 @@ func NewP2PStateProvider(
initialHeight int64,
providers []lightprovider.Provider,
trustOptions light.TrustOptions,
paramsSendCh *p2p.Channel,
paramsSendCh p2p.Channel,
logger log.Logger,
) (StateProvider, error) {
if len(providers) < 2 {

View File

@@ -56,8 +56,8 @@ type syncer struct {
stateProvider StateProvider
conn abciclient.Client
snapshots *snapshotPool
snapshotCh *p2p.Channel
chunkCh *p2p.Channel
snapshotCh p2p.Channel
chunkCh p2p.Channel
tempDir string
fetchers int32
retryTimeout time.Duration

View File

@@ -88,6 +88,7 @@ func newDefaultNode(
}
if cfg.Mode == config.ModeSeed {
return makeSeedNode(
ctx,
logger,
cfg,
config.DefaultDBProvider,
@@ -248,7 +249,7 @@ func makeNode(
},
}
node.router, err = createRouter(logger, nodeMetrics.p2p, node.NodeInfo, nodeKey, peerManager, cfg, proxyApp)
node.router, err = createRouter(ctx, logger, nodeMetrics.p2p, node.NodeInfo, nodeKey, peerManager, cfg, proxyApp)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("failed to create router: %w", err),
@@ -716,6 +717,7 @@ func loadStateFromDBOrGenesisDocProvider(stateStore sm.Store, genDoc *types.Gene
func getRouterConfig(conf *config.Config, appClient abciclient.Client) p2p.RouterOptions {
opts := p2p.RouterOptions{
QueueType: conf.P2P.QueueType,
UseLibP2P: conf.P2P.UseLibP2P,
HandshakeTimeout: conf.P2P.HandshakeTimeout,
DialTimeout: conf.P2P.DialTimeout,
}

View File

@@ -598,6 +598,7 @@ func TestNodeNewSeedNode(t *testing.T) {
logger := log.NewNopLogger()
ns, err := makeSeedNode(
ctx,
logger,
cfg,
config.DefaultDBProvider,

View File

@@ -31,8 +31,7 @@ func NewDefault(
// Genesis document: if the value is nil, the genesis document is read
// from the file specified in the config, and otherwise the node uses
// value of the final argument.
func New(
ctx context.Context,
func New(ctx context.Context,
conf *config.Config,
logger log.Logger,
cf abciclient.Client,
@@ -68,7 +67,7 @@ func New(
config.DefaultDBProvider,
logger)
case config.ModeSeed:
return makeSeedNode(logger, conf, config.DefaultDBProvider, nodeKey, genProvider)
return makeSeedNode(ctx, logger, conf, config.DefaultDBProvider, nodeKey, genProvider)
default:
return nil, fmt.Errorf("%q is not a valid mode", conf.Mode)
}

View File

@@ -39,6 +39,7 @@ type seedNodeImpl struct {
// makeSeedNode returns a new seed node, containing only p2p, pex reactor
func makeSeedNode(
ctx context.Context,
logger log.Logger,
cfg *config.Config,
dbProvider config.DBProvider,
@@ -74,7 +75,7 @@ func makeSeedNode(
closer)
}
router, err := createRouter(logger, p2pMetrics, func() *types.NodeInfo { return &nodeInfo }, nodeKey, peerManager, cfg, nil)
router, err := createRouter(ctx, logger, p2pMetrics, func() *types.NodeInfo { return &nodeInfo }, nodeKey, peerManager, cfg, nil)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("failed to create router: %w", err),

View File

@@ -288,6 +288,7 @@ func createPeerManager(
}
func createRouter(
ctx context.Context,
logger log.Logger,
p2pMetrics *p2p.Metrics,
nodeInfoProducer func() *types.NodeInfo,
@@ -298,22 +299,40 @@ func createRouter(
) (*p2p.Router, error) {
p2pLogger := logger.With("module", "p2p")
opts := getRouterConfig(cfg, appClient)
opts.NodeInfoProducer = nodeInfoProducer
transportConf := conn.DefaultMConnConfig()
transportConf.FlushThrottle = cfg.P2P.FlushThrottleTimeout
transportConf.SendRate = cfg.P2P.SendRate
transportConf.RecvRate = cfg.P2P.RecvRate
transportConf.MaxPacketMsgPayloadSize = cfg.P2P.MaxPacketMsgPayloadSize
transport := p2p.NewMConnTransport(
p2pLogger, transportConf, []*p2p.ChannelDescriptor{},
p2p.MConnTransportOptions{
MaxAcceptedConnections: uint32(cfg.P2P.MaxConnections),
},
)
if cfg.P2P.UseLibP2P {
host, err := p2p.NewHost(cfg.P2P)
if err != nil {
return nil, err
}
opts.NetworkHost = host
ep, err := p2p.NewEndpoint(nodeKey.ID.AddressString(cfg.P2P.ListenAddress))
if err != nil {
return nil, err
ps, err := p2p.NewPubSub(ctx, cfg.P2P, host)
if err != nil {
return nil, err
}
opts.NetworkPubSub = ps
} else {
transportConf := conn.DefaultMConnConfig()
transportConf.FlushThrottle = cfg.P2P.FlushThrottleTimeout
transportConf.SendRate = cfg.P2P.SendRate
transportConf.RecvRate = cfg.P2P.RecvRate
transportConf.MaxPacketMsgPayloadSize = cfg.P2P.MaxPacketMsgPayloadSize
transport := p2p.NewMConnTransport(
p2pLogger, transportConf, []*p2p.ChannelDescriptor{},
p2p.MConnTransportOptions{
MaxAcceptedConnections: uint32(cfg.P2P.MaxConnections),
},
)
ep, err := p2p.NewEndpoint(nodeKey.ID.AddressString(cfg.P2P.ListenAddress))
if err != nil {
return nil, err
}
opts.LegacyEndpoint = ep
opts.LegacyTransport = transport
}
return p2p.NewRouter(
@@ -321,10 +340,7 @@ func createRouter(
p2pMetrics,
nodeKey.PrivKey,
peerManager,
nodeInfoProducer,
transport,
ep,
getRouterConfig(cfg, appClient),
opts,
)
}