From 4322f7d0b94b3405c50217ac8091d7ec09d21e13 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 21 Jun 2022 18:51:50 +0200 Subject: [PATCH 01/21] mempool: make error throwing for CheckTx consistent (#8817) --- internal/mempool/v0/clist_mempool.go | 8 ++------ internal/mempool/v0/clist_mempool_test.go | 10 +++++++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/mempool/v0/clist_mempool.go b/internal/mempool/v0/clist_mempool.go index 0a12c7000..b37e745d8 100644 --- a/internal/mempool/v0/clist_mempool.go +++ b/internal/mempool/v0/clist_mempool.go @@ -242,17 +242,13 @@ func (mem *CListMempool) CheckTx( // so we only record the sender for txs still in the mempool. if e, ok := mem.txsMap.Load(tx.Key()); ok { memTx := e.(*clist.CElement).Value.(*mempoolTx) - _, loaded := memTx.senders.LoadOrStore(txInfo.SenderID, true) + memTx.senders.LoadOrStore(txInfo.SenderID, true) // TODO: consider punishing peer for dups, // its non-trivial since invalid txs can become valid, // but they can spam the same tx with little cost to them atm. - if loaded { - return types.ErrTxInCache - } } - mem.logger.Debug("tx exists already in cache", "tx_hash", tx.Hash()) - return nil + return types.ErrTxInCache } if ctx == nil { diff --git a/internal/mempool/v0/clist_mempool_test.go b/internal/mempool/v0/clist_mempool_test.go index 61ec543ef..774e32a96 100644 --- a/internal/mempool/v0/clist_mempool_test.go +++ b/internal/mempool/v0/clist_mempool_test.go @@ -200,7 +200,7 @@ func TestMempoolUpdate(t *testing.T) { err := mp.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil) require.NoError(t, err) err = mp.CheckTx(context.Background(), []byte{0x01}, nil, mempool.TxInfo{}) - require.NoError(t, err) + assert.Error(t, err) } // 2. Removes valid txs from the mempool @@ -305,11 +305,15 @@ func TestMempool_KeepInvalidTxsInCache(t *testing.T) { // a must be added to the cache err = mp.CheckTx(context.Background(), a, nil, mempool.TxInfo{}) - require.NoError(t, err) + if assert.Error(t, err) { + assert.Equal(t, types.ErrTxInCache, err) + } // b must remain in the cache err = mp.CheckTx(context.Background(), b, nil, mempool.TxInfo{}) - require.NoError(t, err) + if assert.Error(t, err) { + assert.Equal(t, types.ErrTxInCache, err) + } } // 2. An invalid transaction must remain in the cache From 034a9f84227bccb6936379c33d0338983081514c Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 21 Jun 2022 17:16:31 -0400 Subject: [PATCH 02/21] build(deps): Bump github.com/spf13/cobra from 1.4.0 to 1.5.0 (#8811) Bumps [github.com/spf13/cobra](https://github.com/spf13/cobra) from 1.4.0 to 1.5.0. - [Release notes](https://github.com/spf13/cobra/releases) - [Commits](https://github.com/spf13/cobra/compare/v1.4.0...v1.5.0) --- updated-dependencies: - dependency-name: github.com/spf13/cobra dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Thane Thomson --- go.mod | 2 +- go.sum | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index f4d5abc9c..a72269ebc 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/rs/zerolog v1.27.0 github.com/sasha-s/go-deadlock v0.2.1-0.20190427202633-1595213edefa github.com/snikch/goodman v0.0.0-20171125024755-10e37e294daa - github.com/spf13/cobra v1.4.0 + github.com/spf13/cobra v1.5.0 github.com/spf13/viper v1.12.0 github.com/stretchr/testify v1.7.2 github.com/tendermint/tm-db v0.6.6 diff --git a/go.sum b/go.sum index 435b316a0..ba72ebf98 100644 --- a/go.sum +++ b/go.sum @@ -232,6 +232,7 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creachadair/atomicfile v0.2.6 h1:FgYxYvGcqREApTY8Nxg8msM6P/KVKK3ob5h9FaRUTNg= github.com/creachadair/atomicfile v0.2.6/go.mod h1:BRq8Une6ckFneYXZQ+kO7p1ZZP3I2fzVzf28JxrIkBc= github.com/creachadair/command v0.0.0-20220426235536-a748effdf6a1/go.mod h1:bAM+qFQb/KwWyCc9MLC4U1jvn3XyakqP5QRkds5T6cY= @@ -1018,8 +1019,9 @@ github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155 github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= github.com/spf13/cobra v1.3.0/go.mod h1:BrRVncBjOJa/eUcVVm9CE+oC6as8k+VYr4NY7WCi9V4= -github.com/spf13/cobra v1.4.0 h1:y+wJpx64xcgO1V+RcnwW0LEHxTKRi2ZDPSBjWnrg88Q= github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g= +github.com/spf13/cobra v1.5.0 h1:X+jTBEBqF0bHN+9cSMgmfuvv2VHJ9ezmFNf9Y/XstYU= +github.com/spf13/cobra v1.5.0/go.mod h1:dWXEIy2H428czQCjInthrTRUg7yKbok+2Qi/yBIJoUM= github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk= github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo= From e9c87a3c493d239cbe2f911550f367b8cb1ac8ff Mon Sep 17 00:00:00 2001 From: William Banfield <4561443+williambanfield@users.noreply.github.com> Date: Tue, 21 Jun 2022 20:20:04 -0400 Subject: [PATCH 03/21] remove dial wake change (#8824) --- internal/p2p/router.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 015c8610d..1bfd014fc 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -574,11 +574,6 @@ func (r *Router) dialSleep(ctx context.Context) { } r.options.DialSleep(ctx) - - if !r.peerManager.HasDialedMaxPeers() { - r.peerManager.dialWaker.Wake() - } - } // acceptPeers accepts inbound connections from peers on the given transport, From 24701cd587d732c16fa31c7117ad91d48f1a3a71 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 21 Jun 2022 21:27:28 -0400 Subject: [PATCH 04/21] p2p: more dial routines (#8827) (#8828) --- internal/p2p/router.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 1bfd014fc..81a8928a4 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -530,7 +530,7 @@ func (r *Router) routeChannel( func (r *Router) numConcurrentDials() int { if r.options.NumConcurrentDials == nil { - return runtime.NumCPU() + return runtime.NumCPU() * 32 } return r.options.NumConcurrentDials() From df9363c67c6835df96dad752a63b8693b7413c9b Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Wed, 22 Jun 2022 11:54:03 -0700 Subject: [PATCH 05/21] Prepare changelog for Release v0.35.7 (#8772) --- CHANGELOG.md | 20 ++++++++++++++++++++ CHANGELOG_PENDING.md | 2 +- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d321b6d8e..1d467f29e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,9 +2,29 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/cosmos). +## v0.35.7 + +June 16, 2022 + +### BUG FIXES + +- [p2p] [\#8692](https://github.com/tendermint/tendermint/pull/8692) scale the number of stored peers by the configured maximum connections (#8684) +- [rpc] [\#8715](https://github.com/tendermint/tendermint/pull/8715) always close http bodies (backport #8712) +- [p2p] [\#8760](https://github.com/tendermint/tendermint/pull/8760) accept should not abort on first error (backport #8759) + +### BREAKING CHANGES + +- P2P Protocol + + - [p2p] [\#8737](https://github.com/tendermint/tendermint/pull/8737) Introduce "inactive" peer label to avoid re-dialing incompatible peers. (@tychoish) + - [p2p] [\#8737](https://github.com/tendermint/tendermint/pull/8737) Increase frequency of dialing attempts to reduce latency for peer acquisition. (@tychoish) + - [p2p] [\#8737](https://github.com/tendermint/tendermint/pull/8737) Improvements to peer scoring and sorting to gossip a greater variety of peers during PEX. (@tychoish) + - [p2p] [\#8737](https://github.com/tendermint/tendermint/pull/8737) Track incoming and outgoing peers separately to ensure more peer slots open for incoming connections. (@tychoish) + ## v0.35.6 June 3, 2022 + ### FEATURES - [migrate] [\#8672](https://github.com/tendermint/tendermint/pull/8672) provide function for database production (backport #8614) (@tychoish) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 2e820b727..cbc727d49 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -2,7 +2,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/cosmos). -## v0.35.7 +## v0.35.8 Month DD, YYYY From 9daea43375e06816a44c30bdb7b02701df4a5f76 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Wed, 22 Jun 2022 15:16:58 -0700 Subject: [PATCH 06/21] Update default version marker. (#8844) --- version/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version/version.go b/version/version.go index e2c8a6150..b0096fdca 100644 --- a/version/version.go +++ b/version/version.go @@ -10,7 +10,7 @@ const ( // TMVersionDefault is the used as the fallback version of Tendermint Core // when not using git describe. It is formatted with semantic versioning. - TMVersionDefault = "0.35.6" + TMVersionDefault = "0.35.7" // ABCISemVer is the semantic version of the ABCI library ABCISemVer = "0.17.0" From 8ef63fe3d90ae3ccc8cf0fb24236473105a56db9 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 23 Jun 2022 10:46:51 -0400 Subject: [PATCH 07/21] e2e: report peer heights in error message (#8843) (#8853) (cherry picked from commit 52b2efb8274879468af9d032afd844b300b40f6e) Co-authored-by: Sam Kleinman --- test/e2e/runner/rpc.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/e2e/runner/rpc.go b/test/e2e/runner/rpc.go index ad5fa7a64..f608742a8 100644 --- a/test/e2e/runner/rpc.go +++ b/test/e2e/runner/rpc.go @@ -22,7 +22,7 @@ func waitForHeight(ctx context.Context, testnet *e2e.Testnet, height int64) (*ty clients = map[string]*rpchttp.HTTP{} lastHeight int64 lastIncrease = time.Now() - nodesAtHeight = map[string]struct{}{} + nodesAtHeight = map[string]int64{} numRunningNodes int ) if height == 0 { @@ -86,7 +86,7 @@ func waitForHeight(ctx context.Context, testnet *e2e.Testnet, height int64) (*ty // add this node to the set of target // height nodes - nodesAtHeight[node.Name] = struct{}{} + nodesAtHeight[node.Name] = result.SyncInfo.LatestBlockHeight // if not all of the nodes that we // have clients for have reached the From 3398f3797910f43f4b8b7394b3a122009ed6e2df Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 23 Jun 2022 18:25:19 +0200 Subject: [PATCH 08/21] cmd: add tool for compaction of goleveldb (backport #8564) (#8675) --- CHANGELOG_PENDING.md | 2 + cmd/tendermint/commands/compact.go | 69 ++++++++++++++++++++++++++++++ cmd/tendermint/main.go | 1 + go.mod | 1 + 4 files changed, 73 insertions(+) create mode 100644 cmd/tendermint/commands/compact.go diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index cbc727d49..9c518842b 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -22,6 +22,8 @@ Special thanks to external contributors on this release: ### FEATURES +- [cli] [\#8675] Add command to force compact goleveldb databases (@cmwaters) + ### IMPROVEMENTS ### BUG FIXES diff --git a/cmd/tendermint/commands/compact.go b/cmd/tendermint/commands/compact.go new file mode 100644 index 000000000..591788e58 --- /dev/null +++ b/cmd/tendermint/commands/compact.go @@ -0,0 +1,69 @@ +package commands + +import ( + "errors" + "path/filepath" + "sync" + + "github.com/spf13/cobra" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/util" + "github.com/tendermint/tendermint/libs/log" +) + +func MakeCompactDBCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "experimental-compact-goleveldb", + Short: "force compacts the tendermint storage engine (only GoLevelDB supported)", + Long: ` +This is a temporary utility command that performs a force compaction on the state +and blockstores to reduce disk space for a pruning node. This should only be run +once the node has stopped. This command will likely be omitted in the future after +the planned refactor to the storage engine. + +Currently, only GoLevelDB is supported. + `, + RunE: func(cmd *cobra.Command, args []string) error { + if config.DBBackend != "goleveldb" { + return errors.New("compaction is currently only supported with goleveldb") + } + + compactGoLevelDBs(config.RootDir, logger) + return nil + }, + } + + return cmd +} + +func compactGoLevelDBs(rootDir string, logger log.Logger) { + dbNames := []string{"state", "blockstore"} + o := &opt.Options{ + DisableSeeksCompaction: true, + } + wg := sync.WaitGroup{} + + for _, dbName := range dbNames { + dbName := dbName + wg.Add(1) + go func() { + defer wg.Done() + dbPath := filepath.Join(rootDir, "data", dbName+".db") + store, err := leveldb.OpenFile(dbPath, o) + if err != nil { + logger.Error("failed to initialize tendermint db", "path", dbPath, "err", err) + return + } + defer store.Close() + + logger.Info("starting compaction...", "db", dbPath) + + err = store.CompactRange(util.Range{Start: nil, Limit: nil}) + if err != nil { + logger.Error("failed to compact tendermint db", "path", dbPath, "err", err) + } + }() + } + wg.Wait() +} diff --git a/cmd/tendermint/main.go b/cmd/tendermint/main.go index b0ca549e5..0aea8567c 100644 --- a/cmd/tendermint/main.go +++ b/cmd/tendermint/main.go @@ -32,6 +32,7 @@ func main() { cmd.InspectCmd, cmd.RollbackStateCmd, cmd.MakeKeyMigrateCommand(), + cmd.MakeCompactDBCommand(), debug.DebugCmd, cli.NewCompletionCmd(rootCmd, true), ) diff --git a/go.mod b/go.mod index a72269ebc..282818d4b 100644 --- a/go.mod +++ b/go.mod @@ -42,6 +42,7 @@ require ( github.com/spf13/cobra v1.5.0 github.com/spf13/viper v1.12.0 github.com/stretchr/testify v1.7.2 + github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca github.com/tendermint/tm-db v0.6.6 github.com/vektra/mockery/v2 v2.13.1 golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e From 6f4ef729641e718515a785f728fcf867fd6653f6 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 23 Jun 2022 13:21:46 -0400 Subject: [PATCH 09/21] p2p: track peers by address (#8841) (#8855) (cherry picked from commit 436a38f8768f32c2abf3668e586019dcc3defb04) Co-authored-by: Sam Kleinman --- internal/p2p/peermanager.go | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index c5a431b7e..65d0847e1 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -575,6 +575,10 @@ func (m *PeerManager) TryDialNext() (NodeAddress, error) { continue } + if id, ok := m.store.Resolve(addressInfo.Address); ok && (m.isConnected(id) || m.dialing[id]) { + continue + } + // We now have an eligible address to dial. If we're full but have // upgrade capacity (as checked above), we find a lower-scored peer // we can replace and mark it as upgrading so noone else claims it. @@ -1263,6 +1267,7 @@ func (m *PeerManager) retryDelay(failures uint32, persistent bool) time.Duration type peerStore struct { db dbm.DB peers map[types.NodeID]*peerInfo + index map[NodeAddress]types.NodeID ranked []*peerInfo // cache for Ranked(), nil invalidates cache } @@ -1282,6 +1287,7 @@ func newPeerStore(db dbm.DB) (*peerStore, error) { // loadPeers loads all peers from the database into memory. func (s *peerStore) loadPeers() error { peers := map[types.NodeID]*peerInfo{} + addrs := map[NodeAddress]types.NodeID{} start, end := keyPeerInfoRange() iter, err := s.db.Iterator(start, end) @@ -1301,11 +1307,18 @@ func (s *peerStore) loadPeers() error { return fmt.Errorf("invalid peer data: %w", err) } peers[peer.ID] = peer + for addr := range peer.AddressInfo { + // TODO maybe check to see if we've seen this + // addr before for a different peer, there + // could be duplicates. + addrs[addr] = peer.ID + } } if iter.Error() != nil { return iter.Error() } s.peers = peers + s.index = addrs s.ranked = nil // invalidate cache if populated return nil } @@ -1317,6 +1330,12 @@ func (s *peerStore) Get(id types.NodeID) (peerInfo, bool) { return peer.Copy(), ok } +// Resolve returns the peer ID for a given node address if known. +func (s *peerStore) Resolve(addr NodeAddress) (types.NodeID, bool) { + id, ok := s.index[addr] + return id, ok +} + // Set stores peer data. The input data will be copied, and can safely be reused // by the caller. func (s *peerStore) Set(peer peerInfo) error { @@ -1345,20 +1364,29 @@ func (s *peerStore) Set(peer peerInfo) error { // update the existing pointer address. *current = peer } + for addr := range peer.AddressInfo { + s.index[addr] = peer.ID + } return nil } // Delete deletes a peer, or does nothing if it does not exist. func (s *peerStore) Delete(id types.NodeID) error { - if _, ok := s.peers[id]; !ok { + peer, ok := s.peers[id] + if !ok { return nil } - if err := s.db.Delete(keyPeerInfo(id)); err != nil { - return err + for _, addr := range peer.AddressInfo { + delete(s.index, addr.Address) } delete(s.peers, id) s.ranked = nil + + if err := s.db.Delete(keyPeerInfo(id)); err != nil { + return err + } + return nil } From 2df4c2b19d21c408ad0a08496acdd55a24aedf33 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 23 Jun 2022 14:46:10 -0400 Subject: [PATCH 10/21] e2e: add tolerance to peer discovery test (#8849) (#8857) (cherry picked from commit fb209136f85083e1d491ac2589fff41dd80518cb) Co-authored-by: Callum Waters Co-authored-by: Sam Kleinman --- test/e2e/tests/net_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/e2e/tests/net_test.go b/test/e2e/tests/net_test.go index 081dcfdfc..0daf0cfd4 100644 --- a/test/e2e/tests/net_test.go +++ b/test/e2e/tests/net_test.go @@ -17,7 +17,9 @@ func TestNet_Peers(t *testing.T) { netInfo, err := client.NetInfo(ctx) require.NoError(t, err) - expectedPeers := len(node.Testnet.Nodes) + // FIXME: https://github.com/tendermint/tendermint/issues/8848 + // We should be able to assert that we can discover all peers in a network + expectedPeers := len(node.Testnet.Nodes) - 1 // includes extra tolerance peers := make(map[string]*e2e.Node, 0) seen := map[string]bool{} for _, n := range node.Testnet.Nodes { @@ -30,7 +32,7 @@ func TestNet_Peers(t *testing.T) { seen[n.Name] = false } - require.Equal(t, expectedPeers, netInfo.NPeers, + require.GreaterOrEqual(t, netInfo.NPeers, expectedPeers, "node is not fully meshed with peers") for _, peerInfo := range netInfo.Peers { From 826f224c2dc1601eaaed6d70bc0f5a70d6d88871 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 24 Jun 2022 10:42:58 -0400 Subject: [PATCH 11/21] p2p: add eviction metrics and cleanup dialing error handling (backport #8819) (#8820) --- internal/p2p/metrics.go | 10 +++ internal/p2p/peermanager.go | 27 +++---- internal/p2p/peermanager_test.go | 132 +++++++++++-------------------- internal/p2p/router.go | 40 +++++----- internal/p2p/router_test.go | 7 -- test/e2e/runner/rpc.go | 2 +- 6 files changed, 91 insertions(+), 127 deletions(-) diff --git a/internal/p2p/metrics.go b/internal/p2p/metrics.go index 449f93bfc..35d8bb5ca 100644 --- a/internal/p2p/metrics.go +++ b/internal/p2p/metrics.go @@ -53,6 +53,9 @@ type Metrics struct { // this node. PeersConnectedIncoming metrics.Gauge + // Number of peers evicted by this node. + PeersEvicted metrics.Counter + // RouterPeerQueueRecv defines the time taken to read off of a peer's queue // before sending on the connection. RouterPeerQueueRecv metrics.Histogram @@ -110,6 +113,12 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "peers_connected_success", Help: "Number of successful peer connection attempts", }, labels).With(labelsAndValues...), + PeersEvicted: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "peers_evicted", + Help: "Number of connected peers evicted", + }, labels).With(labelsAndValues...), PeersConnectedFailure: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, @@ -200,6 +209,7 @@ func NopMetrics() *Metrics { PeersConnectedIncoming: discard.NewGauge(), PeersConnectedOutgoing: discard.NewGauge(), PeersInactivated: discard.NewGauge(), + PeersEvicted: discard.NewCounter(), PeerReceiveBytesTotal: discard.NewCounter(), PeerSendBytesTotal: discard.NewCounter(), PeerPendingSendBytes: discard.NewGauge(), diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index 65d0847e1..f4213ce6f 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -534,12 +534,13 @@ func (m *PeerManager) HasDialedMaxPeers() bool { // returned peer. func (m *PeerManager) DialNext(ctx context.Context) (NodeAddress, error) { for { - address, err := m.TryDialNext() - if err != nil || (address != NodeAddress{}) { - return address, err + if address := m.TryDialNext(); (address != NodeAddress{}) { + return address, nil } + select { case <-m.dialWaker.Sleep(): + continue case <-ctx.Done(): return NodeAddress{}, ctx.Err() } @@ -548,21 +549,20 @@ func (m *PeerManager) DialNext(ctx context.Context) (NodeAddress, error) { // TryDialNext is equivalent to DialNext(), but immediately returns an empty // address if no peers or connection slots are available. -func (m *PeerManager) TryDialNext() (NodeAddress, error) { +func (m *PeerManager) TryDialNext() NodeAddress { m.mtx.Lock() defer m.mtx.Unlock() // We allow dialing MaxConnected+MaxConnectedUpgrade peers. Including // MaxConnectedUpgrade allows us to probe additional peers that have a // higher score than any other peers, and if successful evict it. - if m.options.MaxConnected > 0 && len(m.connected)+len(m.dialing) >= - int(m.options.MaxConnected)+int(m.options.MaxConnectedUpgrade) { - return NodeAddress{}, nil + if m.options.MaxConnected > 0 && len(m.connected)+len(m.dialing) >= int(m.options.MaxConnected)+int(m.options.MaxConnectedUpgrade) { + return NodeAddress{} } cinfo := m.getConnectedInfo() if m.options.MaxOutgoingConnections > 0 && cinfo.outgoing >= m.options.MaxOutgoingConnections { - return NodeAddress{}, nil + return NodeAddress{} } for _, peer := range m.store.Ranked() { @@ -589,16 +589,16 @@ func (m *PeerManager) TryDialNext() (NodeAddress, error) { if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) { upgradeFromPeer := m.findUpgradeCandidate(peer.ID, peer.Score()) if upgradeFromPeer == "" { - return NodeAddress{}, nil + return NodeAddress{} } m.upgrading[upgradeFromPeer] = peer.ID } m.dialing[peer.ID] = true - return addressInfo.Address, nil + return addressInfo.Address } } - return NodeAddress{}, nil + return NodeAddress{} } // DialFailed reports a failed dial attempt. This will make the peer available @@ -706,8 +706,7 @@ func (m *PeerManager) Dialed(address NodeAddress) error { return err } - if upgradeFromPeer != "" && m.options.MaxConnected > 0 && - len(m.connected) >= int(m.options.MaxConnected) { + if upgradeFromPeer != "" && m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) { // Look for an even lower-scored peer that may have appeared since we // started the upgrade. if p, ok := m.store.Get(upgradeFromPeer); ok { @@ -716,11 +715,11 @@ func (m *PeerManager) Dialed(address NodeAddress) error { } } m.evict[upgradeFromPeer] = true + m.evictWaker.Wake() } m.metrics.PeersConnectedOutgoing.Add(1) m.connected[peer.ID] = peerConnectionOutgoing - m.evictWaker.Wake() return nil } diff --git a/internal/p2p/peermanager_test.go b/internal/p2p/peermanager_test.go index 372182cd0..7d4d8e027 100644 --- a/internal/p2p/peermanager_test.go +++ b/internal/p2p/peermanager_test.go @@ -378,16 +378,14 @@ func TestPeerManager_DialNext_WakeOnDialFailed(t *testing.T) { added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) // Add b. We shouldn't be able to dial it, due to MaxConnected. added, err = peerManager.Add(b) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Zero(t, dial) // Spawn a goroutine to fail a's dial attempt. @@ -415,8 +413,7 @@ func TestPeerManager_DialNext_WakeOnDialFailedRetry(t *testing.T) { added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) require.NoError(t, peerManager.DialFailed(dial)) failed := time.Now() @@ -443,8 +440,7 @@ func TestPeerManager_DialNext_WakeOnDisconnected(t *testing.T) { err = peerManager.Accepted(a.NodeID) require.NoError(t, err) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Zero(t, dial) go func() { @@ -473,8 +469,7 @@ func TestPeerManager_TryDialNext_MaxConnected(t *testing.T) { added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) require.NoError(t, peerManager.Dialed(a)) @@ -482,16 +477,14 @@ func TestPeerManager_TryDialNext_MaxConnected(t *testing.T) { added, err = peerManager.Add(b) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Equal(t, b, dial) // At this point, adding c will not allow dialing it. added, err = peerManager.Add(c) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Zero(t, dial) } @@ -520,7 +513,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) { added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() + dial := peerManager.TryDialNext() require.NoError(t, err) require.Equal(t, a, dial) require.NoError(t, peerManager.Dialed(a)) @@ -529,8 +522,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) { added, err = peerManager.Add(b) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Equal(t, b, dial) // Even though we are at capacity, we should be allowed to dial c for an @@ -538,8 +530,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) { added, err = peerManager.Add(c) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Equal(t, c, dial) // However, since we're using all upgrade slots now, we can't add and dial @@ -547,16 +538,14 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) { added, err = peerManager.Add(d) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Zero(t, dial) // We go through with c's upgrade. require.NoError(t, peerManager.Dialed(c)) // Still can't dial d. - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Zero(t, dial) // Now, if we disconnect a, we should be allowed to dial d because we have a @@ -572,8 +561,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) { added, err = peerManager.Add(e) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Zero(t, dial) } @@ -593,8 +581,7 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) { added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) require.NoError(t, peerManager.Dialed(a)) @@ -602,8 +589,7 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) { added, err = peerManager.Add(b) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Equal(t, b, dial) // Adding c and dialing it will fail, because a is the only connected @@ -611,8 +597,7 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) { added, err = peerManager.Add(c) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Empty(t, dial) } @@ -633,22 +618,19 @@ func TestPeerManager_TryDialNext_DialingConnected(t *testing.T) { added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) // Adding a's TCP address will not dispense a, since it's already dialing. added, err = peerManager.Add(aTCP) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Zero(t, dial) // Marking a as dialed will still not dispense it. require.NoError(t, peerManager.Dialed(a)) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Zero(t, dial) // Adding b and accepting a connection from it will not dispense it either. @@ -656,8 +638,7 @@ func TestPeerManager_TryDialNext_DialingConnected(t *testing.T) { require.NoError(t, err) require.True(t, added) require.NoError(t, peerManager.Accepted(bID)) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Zero(t, dial) } @@ -683,16 +664,14 @@ func TestPeerManager_TryDialNext_Multiple(t *testing.T) { // All addresses should be dispensed as long as dialing them has failed. dial := []p2p.NodeAddress{} for range addresses { - address, err := peerManager.TryDialNext() - require.NoError(t, err) + address := peerManager.TryDialNext() require.NotZero(t, address) require.NoError(t, peerManager.DialFailed(address)) dial = append(dial, address) } require.ElementsMatch(t, dial, addresses) - address, err := peerManager.TryDialNext() - require.NoError(t, err) + address := peerManager.TryDialNext() require.Zero(t, address) } @@ -714,15 +693,14 @@ func TestPeerManager_DialFailed(t *testing.T) { // Dialing and then calling DialFailed with a different address (same // NodeID) should unmark as dialing and allow us to dial the other address // again, but not register the failed address. - dial, err := peerManager.TryDialNext() + dial := peerManager.TryDialNext() require.NoError(t, err) require.Equal(t, a, dial) require.NoError(t, peerManager.DialFailed(p2p.NodeAddress{ Protocol: "tcp", NodeID: aID, Hostname: "localhost"})) require.Equal(t, []p2p.NodeAddress{a}, peerManager.Addresses(aID)) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Equal(t, a, dial) // Calling DialFailed on same address twice should be fine. @@ -753,8 +731,7 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) { added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) require.NoError(t, peerManager.Dialed(a)) @@ -762,8 +739,7 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) { added, err = peerManager.Add(b) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Equal(t, b, dial) // Adding c and dialing it will fail, even though it could upgrade a and we @@ -772,14 +748,12 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) { added, err = peerManager.Add(c) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Empty(t, dial) // Failing b's dial will now make c available for dialing. require.NoError(t, peerManager.DialFailed(b)) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Equal(t, c, dial) } @@ -794,8 +768,7 @@ func TestPeerManager_Dialed_Connected(t *testing.T) { added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) require.NoError(t, peerManager.Dialed(a)) @@ -805,8 +778,7 @@ func TestPeerManager_Dialed_Connected(t *testing.T) { added, err = peerManager.Add(b) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Equal(t, b, dial) require.NoError(t, peerManager.Accepted(b.NodeID)) @@ -835,8 +807,7 @@ func TestPeerManager_Dialed_MaxConnected(t *testing.T) { added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) // Marking b as dialed in the meanwhile (even without TryDialNext) @@ -878,8 +849,7 @@ func TestPeerManager_Dialed_MaxConnectedUpgrade(t *testing.T) { added, err = peerManager.Add(c) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, c, dial) require.NoError(t, peerManager.Dialed(c)) @@ -923,8 +893,7 @@ func TestPeerManager_Dialed_Upgrade(t *testing.T) { added, err = peerManager.Add(b) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, b, dial) require.NoError(t, peerManager.Dialed(b)) @@ -933,8 +902,7 @@ func TestPeerManager_Dialed_Upgrade(t *testing.T) { added, err = peerManager.Add(c) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Empty(t, dial) // a should now be evicted. @@ -977,8 +945,7 @@ func TestPeerManager_Dialed_UpgradeEvenLower(t *testing.T) { added, err = peerManager.Add(c) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, c, dial) // In the meanwhile, a disconnects and d connects. d is even lower-scored @@ -1028,7 +995,7 @@ func TestPeerManager_Dialed_UpgradeNoEvict(t *testing.T) { added, err = peerManager.Add(c) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() + dial := peerManager.TryDialNext() require.NoError(t, err) require.Equal(t, c, dial) @@ -1074,8 +1041,7 @@ func TestPeerManager_Accepted(t *testing.T) { added, err = peerManager.Add(c) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, c, dial) require.NoError(t, peerManager.Accepted(c.NodeID)) require.Error(t, peerManager.Dialed(c)) @@ -1084,8 +1050,7 @@ func TestPeerManager_Accepted(t *testing.T) { added, err = peerManager.Add(d) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Equal(t, d, dial) require.NoError(t, peerManager.Dialed(d)) require.Error(t, peerManager.Accepted(d.NodeID)) @@ -1233,8 +1198,7 @@ func TestPeerManager_Accepted_UpgradeDialing(t *testing.T) { added, err = peerManager.Add(b) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, b, dial) // a has already been claimed as an upgrade of a, so accepting @@ -1394,8 +1358,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeDialed(t *testing.T) { added, err := peerManager.Add(b) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, b, dial) require.NoError(t, peerManager.Dialed(b)) }() @@ -1521,13 +1484,11 @@ func TestPeerManager_Disconnected(t *testing.T) { // Disconnecting a dialing peer does not unmark it as dialing, to avoid // dialing it multiple times in parallel. - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) peerManager.Disconnected(a.NodeID) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Zero(t, dial) } @@ -1595,8 +1556,7 @@ func TestPeerManager_Subscribe(t *testing.T) { require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates()) // Outbound connection with peer error and eviction. - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) require.Empty(t, sub.Updates()) @@ -1619,8 +1579,7 @@ func TestPeerManager_Subscribe(t *testing.T) { require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates()) // Outbound connection with dial failure. - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Equal(t, a, dial) require.Empty(t, sub.Updates()) @@ -1716,8 +1675,7 @@ func TestPeerManager_Close(t *testing.T) { added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) require.NoError(t, peerManager.DialFailed(a)) diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 81a8928a4..3e52f4c05 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -291,7 +291,7 @@ func NewRouter( router := &Router{ logger: logger, - metrics: metrics, + metrics: NopMetrics(), nodeInfo: nodeInfo, privKey: privKey, connTracker: newConnTracker( @@ -312,6 +312,10 @@ func NewRouter( router.BaseService = service.NewBaseService(logger, "router", router) + if metrics != nil { + router.metrics = metrics + } + qf, err := router.createQueueFactory() if err != nil { return nil, err @@ -422,11 +426,7 @@ func (r *Router) routeChannel( ) { for { select { - case envelope, ok := <-outCh: - if !ok { - return - } - + case envelope := <-outCh: // Mark the envelope with the channel ID to allow sendPeer() to pass // it on to Transport.SendMessage(). envelope.channelID = chID @@ -503,20 +503,22 @@ func (r *Router) routeChannel( } } - case peerError, ok := <-errCh: - if !ok { - return - } - - shouldEvict := peerError.Fatal || r.peerManager.HasMaxPeerCapacity() + case peerError := <-errCh: + maxPeerCapacity := r.peerManager.HasMaxPeerCapacity() r.logger.Error("peer error", "peer", peerError.NodeID, "err", peerError.Err, - "evicting", shouldEvict, + "disconnecting", peerError.Fatal || maxPeerCapacity, ) - if shouldEvict { + + if peerError.Fatal || maxPeerCapacity { + // if the error is fatal or all peer + // slots are in use, we can error + // (disconnect) from the peer. r.peerManager.Errored(peerError.NodeID, peerError.Err) } else { + // this just decrements the peer + // score. r.peerManager.processPeerEvent(PeerUpdate{ NodeID: peerError.NodeID, Status: PeerStatusBad, @@ -697,9 +699,8 @@ LOOP: case errors.Is(err, context.Canceled): r.logger.Debug("stopping dial routine") break LOOP - case err != nil: - r.logger.Error("failed to find next peer to dial", "err", err) - break LOOP + case address == NodeAddress{}: + continue LOOP } select { @@ -709,7 +710,7 @@ LOOP: // create connections too quickly. r.dialSleep(ctx) - continue + continue LOOP case <-ctx.Done(): close(addresses) break LOOP @@ -748,6 +749,7 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) { if err := r.runWithPeerMutex(func() error { return r.peerManager.Dialed(address) }); err != nil { r.logger.Error("failed to dial peer", "op", "outgoing/dialing", "peer", address.NodeID, "err", err) + r.peerManager.dialWaker.Wake() conn.Close() return } @@ -1035,6 +1037,8 @@ func (r *Router) evictPeers() { queue, ok := r.peerQueues[peerID] r.peerMtx.RUnlock() + r.metrics.PeersEvicted.Add(1) + if ok { queue.close() } diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index e8494fdf4..317750d03 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -133,13 +133,6 @@ func TestRouter_Channel_Basic(t *testing.T) { require.NoError(t, err) require.Contains(t, router.NodeInfo().Channels, chDesc2.ID) - // Closing the channel, then opening it again should be fine. - channel.Close() - time.Sleep(100 * time.Millisecond) // yes yes, but Close() is async... - - channel, err = router.OpenChannel(chDesc, &p2ptest.Message{}, 0) - require.NoError(t, err) - // We should be able to send on the channel, even though there are no peers. p2ptest.RequireSend(t, channel, p2p.Envelope{ To: types.NodeID(strings.Repeat("a", 40)), diff --git a/test/e2e/runner/rpc.go b/test/e2e/runner/rpc.go index f608742a8..0bf982480 100644 --- a/test/e2e/runner/rpc.go +++ b/test/e2e/runner/rpc.go @@ -111,7 +111,7 @@ func waitForHeight(ctx context.Context, testnet *e2e.Testnet, height int64) (*ty if len(clients) == 0 { return nil, nil, errors.New("unable to connect to any network nodes") } - if time.Since(lastIncrease) >= time.Minute { + if time.Since(lastIncrease) >= 90*time.Second { if lastHeight == 0 { return nil, nil, errors.New("chain stalled at unknown height (most likely upon starting)") } From 19b98c700585a4b329cf479f5cef81f726b6020e Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 24 Jun 2022 13:22:26 -0400 Subject: [PATCH 12/21] e2e: disable another network test (#8862) (#8873) Follow up on: https://github.com/tendermint/tendermint/pull/8849 (cherry picked from commit c4d24eed7d0b8902c41d790e1de446baa0256e6b) Co-authored-by: Callum Waters --- test/e2e/tests/net_test.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/test/e2e/tests/net_test.go b/test/e2e/tests/net_test.go index 0daf0cfd4..9dafd00d3 100644 --- a/test/e2e/tests/net_test.go +++ b/test/e2e/tests/net_test.go @@ -19,7 +19,7 @@ func TestNet_Peers(t *testing.T) { // FIXME: https://github.com/tendermint/tendermint/issues/8848 // We should be able to assert that we can discover all peers in a network - expectedPeers := len(node.Testnet.Nodes) - 1 // includes extra tolerance + expectedPeers := len(node.Testnet.Nodes) peers := make(map[string]*e2e.Node, 0) seen := map[string]bool{} for _, n := range node.Testnet.Nodes { @@ -32,7 +32,7 @@ func TestNet_Peers(t *testing.T) { seen[n.Name] = false } - require.GreaterOrEqual(t, netInfo.NPeers, expectedPeers, + require.GreaterOrEqual(t, netInfo.NPeers, expectedPeers-1, "node is not fully meshed with peers") for _, peerInfo := range netInfo.Peers { @@ -44,8 +44,10 @@ func TestNet_Peers(t *testing.T) { seen[peer.Name] = true } - for name := range seen { - require.True(t, seen[name], "node %v not peered with %v", node.Name, name) - } + // FIXME: https://github.com/tendermint/tendermint/issues/8848 + // We should be able to assert that we can discover all peers in a network + // for name := range seen { + // require.True(t, seen[name], "node %v not peered with %v", node.Name, name) + // } }) } From f19e52e6f2b79760a0e0fc9b50d8146102cd1ee2 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 27 Jun 2022 09:13:46 -0400 Subject: [PATCH 13/21] build(deps): Bump styfle/cancel-workflow-action from 0.9.1 to 0.10.0 (#8882) Bumps [styfle/cancel-workflow-action](https://github.com/styfle/cancel-workflow-action) from 0.9.1 to 0.10.0. - [Release notes](https://github.com/styfle/cancel-workflow-action/releases) - [Commits](https://github.com/styfle/cancel-workflow-action/compare/0.9.1...0.10.0) --- updated-dependencies: - dependency-name: styfle/cancel-workflow-action dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/janitor.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/janitor.yml b/.github/workflows/janitor.yml index e6bc45ec1..ceb21941d 100644 --- a/.github/workflows/janitor.yml +++ b/.github/workflows/janitor.yml @@ -10,7 +10,7 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 3 steps: - - uses: styfle/cancel-workflow-action@0.9.1 + - uses: styfle/cancel-workflow-action@0.10.0 with: workflow_id: 1041851,1401230,2837803 access_token: ${{ github.token }} From c4ef56607178852fe0e1f4caa37409533935c96b Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 27 Jun 2022 10:49:51 -0400 Subject: [PATCH 14/21] p2p: remove dial sleep and provide disconnect cooldown (backport #8839) (#8875) (cherry picked from commit 52b6dc19badf50938ae2b2a1d2e22813614e5ad5) --- internal/p2p/p2ptest/network.go | 3 +-- internal/p2p/peermanager.go | 34 +++++++++++++++++++++++++++---- internal/p2p/router.go | 36 --------------------------------- internal/p2p/router_test.go | 2 -- node/setup.go | 23 +++++++++++---------- 5 files changed, 43 insertions(+), 55 deletions(-) diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go index 9115b70ee..7f760e968 100644 --- a/internal/p2p/p2ptest/network.go +++ b/internal/p2p/p2ptest/network.go @@ -1,7 +1,6 @@ package p2ptest import ( - "context" "math/rand" "testing" "time" @@ -254,7 +253,7 @@ func (n *Network) MakeNode(t *testing.T, opts NodeOptions) *Node { privKey, peerManager, []p2p.Transport{transport}, - p2p.RouterOptions{DialSleep: func(_ context.Context) {}}, + p2p.RouterOptions{}, ) require.NoError(t, err) require.NoError(t, router.Start()) diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index f4213ce6f..52d1cfe9f 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -162,6 +162,10 @@ type PeerManagerOptions struct { // retry times, to avoid thundering herds. 0 disables jitter. RetryTimeJitter time.Duration + // DisconnectCooldownPeriod is the amount of time after we + // disconnect from a peer before we'll consider dialing a new peer + DisconnectCooldownPeriod time.Duration + // PeerScores sets fixed scores for specific peers. It is mainly used // for testing. A score of 0 is ignored. PeerScores map[types.NodeID]PeerScore @@ -570,6 +574,10 @@ func (m *PeerManager) TryDialNext() NodeAddress { continue } + if !peer.LastDisconnected.IsZero() && time.Since(peer.LastDisconnected) < m.options.DisconnectCooldownPeriod { + continue + } + for _, addressInfo := range peer.AddressInfo { if time.Since(addressInfo.LastDialFailure) < m.retryDelay(addressInfo.DialFailures, peer.Persistent) { continue @@ -888,6 +896,22 @@ func (m *PeerManager) Disconnected(peerID types.NodeID) { delete(m.evicting, peerID) delete(m.ready, peerID) + if peer, ok := m.store.Get(peerID); ok { + peer.LastDisconnected = time.Now() + _ = m.store.Set(peer) + // launch a thread to ping the dialWaker when the + // disconnected peer can be dialed again. + go func() { + timer := time.NewTimer(m.options.DisconnectCooldownPeriod) + defer timer.Stop() + select { + case <-timer.C: + m.dialWaker.Wake() + case <-m.closeCh: + } + }() + } + if ready { m.broadcast(PeerUpdate{ NodeID: peerID, @@ -1474,9 +1498,10 @@ func (s *peerStore) Size() int { // peerInfo contains peer information stored in a peerStore. type peerInfo struct { - ID types.NodeID - AddressInfo map[NodeAddress]*peerAddressInfo - LastConnected time.Time + ID types.NodeID + AddressInfo map[NodeAddress]*peerAddressInfo + LastConnected time.Time + LastDisconnected time.Time // These fields are ephemeral, i.e. not persisted to the database. Persistent bool @@ -1516,8 +1541,8 @@ func peerInfoFromProto(msg *p2pproto.PeerInfo) (*peerInfo, error) { func (p *peerInfo) ToProto() *p2pproto.PeerInfo { msg := &p2pproto.PeerInfo{ ID: string(p.ID), - LastConnected: &p.LastConnected, Inactive: p.Inactive, + LastConnected: &p.LastConnected, } for _, addressInfo := range p.AddressInfo { msg.AddressInfo = append(msg.AddressInfo, addressInfo.ToProto()) @@ -1525,6 +1550,7 @@ func (p *peerInfo) ToProto() *p2pproto.PeerInfo { if msg.LastConnected.IsZero() { msg.LastConnected = nil } + return msg } diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 3e52f4c05..6b056ae53 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "math/rand" "net" "runtime" "sync" @@ -160,12 +159,6 @@ type RouterOptions struct { // return an error to reject the peer. FilterPeerByID func(context.Context, types.NodeID) error - // DialSleep controls the amount of time that the router - // sleeps between dialing peers. If not set, a default value - // is used that sleeps for a (random) amount of time up to 3 - // seconds between submitting each peer to be dialed. - DialSleep func(context.Context) - // NumConcrruentDials controls how many parallel go routines // are used to dial peers. This defaults to the value of // runtime.NumCPU. @@ -554,30 +547,6 @@ func (r *Router) filterPeersID(ctx context.Context, id types.NodeID) error { return r.options.FilterPeerByID(ctx, id) } -func (r *Router) dialSleep(ctx context.Context) { - if r.options.DialSleep == nil { - const ( - maxDialerInterval = 500 - minDialerInterval = 100 - ) - - // nolint:gosec // G404: Use of weak random number generator - dur := time.Duration(rand.Int63n(maxDialerInterval-minDialerInterval+1) + minDialerInterval) - - timer := time.NewTimer(dur * time.Millisecond) - defer timer.Stop() - - select { - case <-ctx.Done(): - case <-timer.C: - } - - return - } - - r.options.DialSleep(ctx) -} - // acceptPeers accepts inbound connections from peers on the given transport, // and spawns goroutines that route messages to/from them. func (r *Router) acceptPeers(transport Transport) { @@ -705,11 +674,6 @@ LOOP: select { case addresses <- address: - // this jitters the frequency that we call - // DialNext and prevents us from attempting to - // create connections too quickly. - - r.dialSleep(ctx) continue LOOP case <-ctx.Done(): close(addresses) diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index 317750d03..8bdc21cbb 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -1,7 +1,6 @@ package p2p_test import ( - "context" "errors" "fmt" "io" @@ -664,7 +663,6 @@ func TestRouter_DialPeers_Parallel(t *testing.T) { peerManager, []p2p.Transport{mockTransport}, p2p.RouterOptions{ - DialSleep: func(_ context.Context) {}, NumConcurrentDials: func() int { ncpu := runtime.NumCPU() if ncpu <= 3 { diff --git a/node/setup.go b/node/setup.go index 28a61a0d4..26a07e9ef 100644 --- a/node/setup.go +++ b/node/setup.go @@ -504,17 +504,18 @@ func createPeerManager( const maxUpgradeConns = 4 options := p2p.PeerManagerOptions{ - SelfAddress: selfAddr, - MaxConnected: maxConns, - MaxOutgoingConnections: maxOutgoingConns, - MaxConnectedUpgrade: maxUpgradeConns, - MaxPeers: maxUpgradeConns + 4*maxConns, - MinRetryTime: 250 * time.Millisecond, - MaxRetryTime: 30 * time.Minute, - MaxRetryTimePersistent: 5 * time.Minute, - RetryTimeJitter: 5 * time.Second, - PrivatePeers: privatePeerIDs, - Metrics: metrics, + SelfAddress: selfAddr, + MaxConnected: maxConns, + MaxOutgoingConnections: maxOutgoingConns, + MaxConnectedUpgrade: maxUpgradeConns, + DisconnectCooldownPeriod: 2 * time.Second, + MaxPeers: maxUpgradeConns + 4*maxConns, + MinRetryTime: 250 * time.Millisecond, + MaxRetryTime: 30 * time.Minute, + MaxRetryTimePersistent: 5 * time.Minute, + RetryTimeJitter: 5 * time.Second, + PrivatePeers: privatePeerIDs, + Metrics: metrics, } peers := []p2p.NodeAddress{} From 978f754ad3f928d5e067bf13037759affa911e3c Mon Sep 17 00:00:00 2001 From: William Banfield <4561443+williambanfield@users.noreply.github.com> Date: Tue, 28 Jun 2022 16:07:15 -0400 Subject: [PATCH 15/21] p2p: set empty timeouts to configed values. (manual backport of #8847) (#8869) * regenerate mocks using newer style * p2p: set empty timeouts to small values. (#8847) These timeouts default to 'do not time out' if they are not set. This times up resources, potentially indefinitely. If node on the other side of the the handshake is up but unresponsive, the[ handshake call](https://github.com/tendermint/tendermint/blob/edec79448aa1d62b84683b1b22e12e145dbdda7c/internal/p2p/router.go#L720) will _never_ return. * fix light client select statement --- abci/client/mocks/client.go | 15 ++++++++ internal/consensus/mocks/cons_sync_reactor.go | 15 ++++++++ internal/evidence/mocks/block_store.go | 15 ++++++++ internal/p2p/mocks/connection.go | 35 ++++++++++++++----- internal/p2p/mocks/peer.go | 15 ++++++++ internal/p2p/mocks/transport.go | 15 ++++++++ internal/p2p/peer_test.go | 6 ++-- internal/p2p/router.go | 8 +---- internal/p2p/router_test.go | 14 ++++---- internal/p2p/switch.go | 6 ++-- internal/p2p/switch_test.go | 8 ++--- internal/p2p/test_util.go | 2 +- internal/p2p/transport.go | 3 +- internal/p2p/transport_mconn.go | 19 ++++++++-- internal/p2p/transport_memory.go | 8 +++++ internal/p2p/transport_test.go | 12 +++---- internal/proxy/mocks/app_conn_consensus.go | 15 ++++++++ internal/proxy/mocks/app_conn_mempool.go | 15 ++++++++ internal/proxy/mocks/app_conn_query.go | 15 ++++++++ internal/proxy/mocks/app_conn_snapshot.go | 15 ++++++++ internal/state/indexer/mocks/event_sink.go | 15 ++++++++ internal/state/mocks/block_store.go | 15 ++++++++ internal/state/mocks/evidence_pool.go | 15 ++++++++ internal/state/mocks/store.go | 15 ++++++++ internal/statesync/mocks/state_provider.go | 15 ++++++++ light/client.go | 11 +++--- light/provider/mocks/provider.go | 15 ++++++++ light/rpc/mocks/light_client.go | 15 ++++++++ node/node.go | 4 ++- rpc/client/mocks/client.go | 15 ++++++++ scripts/mockery_generate.sh | 14 +++++++- 31 files changed, 354 insertions(+), 51 deletions(-) diff --git a/abci/client/mocks/client.go b/abci/client/mocks/client.go index 664646e61..cfe5ea7af 100644 --- a/abci/client/mocks/client.go +++ b/abci/client/mocks/client.go @@ -801,3 +801,18 @@ func (_m *Client) String() string { func (_m *Client) Wait() { _m.Called() } + +type NewClientT interface { + mock.TestingT + Cleanup(func()) +} + +// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewClient(t NewClientT) *Client { + mock := &Client{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/consensus/mocks/cons_sync_reactor.go b/internal/consensus/mocks/cons_sync_reactor.go index 5ac592f0d..2c694742b 100644 --- a/internal/consensus/mocks/cons_sync_reactor.go +++ b/internal/consensus/mocks/cons_sync_reactor.go @@ -26,3 +26,18 @@ func (_m *ConsSyncReactor) SetStateSyncingMetrics(_a0 float64) { func (_m *ConsSyncReactor) SwitchToConsensus(_a0 state.State, _a1 bool) { _m.Called(_a0, _a1) } + +type NewConsSyncReactorT interface { + mock.TestingT + Cleanup(func()) +} + +// NewConsSyncReactor creates a new instance of ConsSyncReactor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewConsSyncReactor(t NewConsSyncReactorT) *ConsSyncReactor { + mock := &ConsSyncReactor{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/evidence/mocks/block_store.go b/internal/evidence/mocks/block_store.go index ef3346b2a..b0c67ff87 100644 --- a/internal/evidence/mocks/block_store.go +++ b/internal/evidence/mocks/block_store.go @@ -57,3 +57,18 @@ func (_m *BlockStore) LoadBlockMeta(height int64) *types.BlockMeta { return r0 } + +type NewBlockStoreT interface { + mock.TestingT + Cleanup(func()) +} + +// NewBlockStore creates a new instance of BlockStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewBlockStore(t NewBlockStoreT) *BlockStore { + mock := &BlockStore{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/p2p/mocks/connection.go b/internal/p2p/mocks/connection.go index 6c6174117..e5ba9584a 100644 --- a/internal/p2p/mocks/connection.go +++ b/internal/p2p/mocks/connection.go @@ -13,6 +13,8 @@ import ( p2p "github.com/tendermint/tendermint/internal/p2p" + time "time" + types "github.com/tendermint/tendermint/types" ) @@ -49,20 +51,20 @@ func (_m *Connection) FlushClose() error { return r0 } -// Handshake provides a mock function with given fields: _a0, _a1, _a2 -func (_m *Connection) Handshake(_a0 context.Context, _a1 types.NodeInfo, _a2 crypto.PrivKey) (types.NodeInfo, crypto.PubKey, error) { - ret := _m.Called(_a0, _a1, _a2) +// Handshake provides a mock function with given fields: _a0, _a1, _a2, _a3 +func (_m *Connection) Handshake(_a0 context.Context, _a1 time.Duration, _a2 types.NodeInfo, _a3 crypto.PrivKey) (types.NodeInfo, crypto.PubKey, error) { + ret := _m.Called(_a0, _a1, _a2, _a3) var r0 types.NodeInfo - if rf, ok := ret.Get(0).(func(context.Context, types.NodeInfo, crypto.PrivKey) types.NodeInfo); ok { - r0 = rf(_a0, _a1, _a2) + if rf, ok := ret.Get(0).(func(context.Context, time.Duration, types.NodeInfo, crypto.PrivKey) types.NodeInfo); ok { + r0 = rf(_a0, _a1, _a2, _a3) } else { r0 = ret.Get(0).(types.NodeInfo) } var r1 crypto.PubKey - if rf, ok := ret.Get(1).(func(context.Context, types.NodeInfo, crypto.PrivKey) crypto.PubKey); ok { - r1 = rf(_a0, _a1, _a2) + if rf, ok := ret.Get(1).(func(context.Context, time.Duration, types.NodeInfo, crypto.PrivKey) crypto.PubKey); ok { + r1 = rf(_a0, _a1, _a2, _a3) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(crypto.PubKey) @@ -70,8 +72,8 @@ func (_m *Connection) Handshake(_a0 context.Context, _a1 types.NodeInfo, _a2 cry } var r2 error - if rf, ok := ret.Get(2).(func(context.Context, types.NodeInfo, crypto.PrivKey) error); ok { - r2 = rf(_a0, _a1, _a2) + if rf, ok := ret.Get(2).(func(context.Context, time.Duration, types.NodeInfo, crypto.PrivKey) error); ok { + r2 = rf(_a0, _a1, _a2, _a3) } else { r2 = ret.Error(2) } @@ -206,3 +208,18 @@ func (_m *Connection) TrySendMessage(_a0 p2p.ChannelID, _a1 []byte) (bool, error return r0, r1 } + +type NewConnectionT interface { + mock.TestingT + Cleanup(func()) +} + +// NewConnection creates a new instance of Connection. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewConnection(t NewConnectionT) *Connection { + mock := &Connection{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/p2p/mocks/peer.go b/internal/p2p/mocks/peer.go index b905c1156..021c905f2 100644 --- a/internal/p2p/mocks/peer.go +++ b/internal/p2p/mocks/peer.go @@ -332,3 +332,18 @@ func (_m *Peer) TrySend(_a0 byte, _a1 []byte) bool { func (_m *Peer) Wait() { _m.Called() } + +type NewPeerT interface { + mock.TestingT + Cleanup(func()) +} + +// NewPeer creates a new instance of Peer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewPeer(t NewPeerT) *Peer { + mock := &Peer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/p2p/mocks/transport.go b/internal/p2p/mocks/transport.go index 82bd670cb..46d825501 100644 --- a/internal/p2p/mocks/transport.go +++ b/internal/p2p/mocks/transport.go @@ -119,3 +119,18 @@ func (_m *Transport) String() string { return r0 } + +type NewTransportT interface { + mock.TestingT + Cleanup(func()) +} + +// NewTransport creates a new instance of Transport. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewTransport(t NewTransportT) *Transport { + mock := &Transport{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/p2p/peer_test.go b/internal/p2p/peer_test.go index dfe7bc798..dad7b98b5 100644 --- a/internal/p2p/peer_test.go +++ b/internal/p2p/peer_test.go @@ -90,7 +90,7 @@ func createOutboundPeerAndPerformHandshake( if err != nil { return nil, err } - peerInfo, _, err := pc.conn.Handshake(context.Background(), ourNodeInfo, pk) + peerInfo, _, err := pc.conn.Handshake(context.Background(), 0, ourNodeInfo, pk) if err != nil { return nil, err } @@ -187,7 +187,7 @@ func (rp *remotePeer) Dial(addr *NetAddress) (net.Conn, error) { if err != nil { return nil, err } - _, _, err = pc.conn.Handshake(context.Background(), rp.nodeInfo(), rp.PrivKey) + _, _, err = pc.conn.Handshake(context.Background(), 0, rp.nodeInfo(), rp.PrivKey) if err != nil { return nil, err } @@ -213,7 +213,7 @@ func (rp *remotePeer) accept() { if err != nil { golog.Printf("Failed to create a peer: %+v", err) } - _, _, err = pc.conn.Handshake(context.Background(), rp.nodeInfo(), rp.PrivKey) + _, _, err = pc.conn.Handshake(context.Background(), 0, rp.nodeInfo(), rp.PrivKey) if err != nil { golog.Printf("Failed to handshake a peer: %+v", err) } diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 6b056ae53..7247642ca 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -795,13 +795,7 @@ func (r *Router) handshakePeer( expectID types.NodeID, ) (types.NodeInfo, crypto.PubKey, error) { - if r.options.HandshakeTimeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, r.options.HandshakeTimeout) - defer cancel() - } - - peerInfo, peerKey, err := conn.Handshake(ctx, r.nodeInfo, r.privKey) + peerInfo, peerKey, err := conn.Handshake(ctx, r.options.HandshakeTimeout, r.nodeInfo, r.privKey) if err != nil { return peerInfo, peerKey, err } diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index 8bdc21cbb..556067d1f 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -344,7 +344,7 @@ func TestRouter_AcceptPeers(t *testing.T) { closer := tmsync.NewCloser() mockConnection := &mocks.Connection{} mockConnection.On("String").Maybe().Return("mock") - mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey). + mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey). Return(tc.peerInfo, tc.peerKey, nil) mockConnection.On("Close").Run(func(_ mock.Arguments) { closer.Close() }).Return(nil) mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{}) @@ -454,7 +454,7 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) { mockConnection := &mocks.Connection{} mockConnection.On("String").Maybe().Return("mock") - mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey). + mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey). WaitUntil(closeCh).Return(types.NodeInfo{}, nil, io.EOF) mockConnection.On("Close").Return(nil) mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{}) @@ -535,7 +535,7 @@ func TestRouter_DialPeers(t *testing.T) { mockConnection := &mocks.Connection{} mockConnection.On("String").Maybe().Return("mock") if tc.dialErr == nil { - mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey). + mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey). Return(tc.peerInfo, tc.peerKey, nil) mockConnection.On("Close").Run(func(_ mock.Arguments) { closer.Close() }).Return(nil) } @@ -622,7 +622,7 @@ func TestRouter_DialPeers_Parallel(t *testing.T) { mockConnection := &mocks.Connection{} mockConnection.On("String").Maybe().Return("mock") - mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey). + mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey). WaitUntil(closeCh).Return(types.NodeInfo{}, nil, io.EOF) mockConnection.On("Close").Return(nil) @@ -701,7 +701,7 @@ func TestRouter_EvictPeers(t *testing.T) { mockConnection := &mocks.Connection{} mockConnection.On("String").Maybe().Return("mock") - mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey). + mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey). Return(peerInfo, peerKey.PubKey(), nil) mockConnection.On("ReceiveMessage").WaitUntil(closeCh).Return(chID, nil, io.EOF) mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{}) @@ -770,7 +770,7 @@ func TestRouter_ChannelCompatability(t *testing.T) { mockConnection := &mocks.Connection{} mockConnection.On("String").Maybe().Return("mock") - mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey). + mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey). Return(incompatiblePeer, peerKey.PubKey(), nil) mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{}) mockConnection.On("Close").Return(nil) @@ -819,7 +819,7 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) { mockConnection := &mocks.Connection{} mockConnection.On("String").Maybe().Return("mock") - mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey). + mockConnection.On("Handshake", mock.Anything, mock.Anything, selfInfo, selfKey). Return(peer, peerKey.PubKey(), nil) mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{}) mockConnection.On("Close").Return(nil) diff --git a/internal/p2p/switch.go b/internal/p2p/switch.go index 60c0c7deb..9e1b0311b 100644 --- a/internal/p2p/switch.go +++ b/internal/p2p/switch.go @@ -865,11 +865,11 @@ func (sw *Switch) handshakePeer( c Connection, expectPeerID types.NodeID, ) (types.NodeInfo, crypto.PubKey, error) { - // Moved from transport and hardcoded until legacy P2P stack removal. - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - peerInfo, peerKey, err := c.Handshake(ctx, sw.nodeInfo, sw.nodeKey.PrivKey) + // Moved timeout from transport and hardcoded until legacy P2P stack removal. + peerInfo, peerKey, err := c.Handshake(ctx, 5*time.Second, sw.nodeInfo, sw.nodeKey.PrivKey) if err != nil { return peerInfo, peerKey, ErrRejected{ conn: c.(*mConnConnection).conn, diff --git a/internal/p2p/switch_test.go b/internal/p2p/switch_test.go index 8cb755c9f..c68cfceaf 100644 --- a/internal/p2p/switch_test.go +++ b/internal/p2p/switch_test.go @@ -267,7 +267,7 @@ func TestSwitchPeerFilter(t *testing.T) { if err != nil { t.Fatal(err) } - peerInfo, _, err := c.Handshake(ctx, sw.nodeInfo, sw.nodeKey.PrivKey) + peerInfo, _, err := c.Handshake(ctx, 0, sw.nodeInfo, sw.nodeKey.PrivKey) if err != nil { t.Fatal(err) } @@ -324,7 +324,7 @@ func TestSwitchPeerFilterTimeout(t *testing.T) { if err != nil { t.Fatal(err) } - peerInfo, _, err := c.Handshake(ctx, sw.nodeInfo, sw.nodeKey.PrivKey) + peerInfo, _, err := c.Handshake(ctx, 0, sw.nodeInfo, sw.nodeKey.PrivKey) if err != nil { t.Fatal(err) } @@ -360,7 +360,7 @@ func TestSwitchPeerFilterDuplicate(t *testing.T) { if err != nil { t.Fatal(err) } - peerInfo, _, err := c.Handshake(ctx, sw.nodeInfo, sw.nodeKey.PrivKey) + peerInfo, _, err := c.Handshake(ctx, 0, sw.nodeInfo, sw.nodeKey.PrivKey) if err != nil { t.Fatal(err) } @@ -415,7 +415,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { if err != nil { t.Fatal(err) } - peerInfo, _, err := c.Handshake(ctx, sw.nodeInfo, sw.nodeKey.PrivKey) + peerInfo, _, err := c.Handshake(ctx, 0, sw.nodeInfo, sw.nodeKey.PrivKey) if err != nil { t.Fatal(err) } diff --git a/internal/p2p/test_util.go b/internal/p2p/test_util.go index b2851646d..ae21ba4d7 100644 --- a/internal/p2p/test_util.go +++ b/internal/p2p/test_util.go @@ -126,7 +126,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error { } return err } - peerNodeInfo, _, err := pc.conn.Handshake(context.Background(), sw.nodeInfo, sw.nodeKey.PrivKey) + peerNodeInfo, _, err := pc.conn.Handshake(context.Background(), 0, sw.nodeInfo, sw.nodeKey.PrivKey) if err != nil { if err := conn.Close(); err != nil { sw.Logger.Error("Error closing connection", "err", err) diff --git a/internal/p2p/transport.go b/internal/p2p/transport.go index a3245dfc8..2e4d26abd 100644 --- a/internal/p2p/transport.go +++ b/internal/p2p/transport.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net" + "time" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/internal/p2p/conn" @@ -84,7 +85,7 @@ type Connection interface { // FIXME: The handshake should really be the Router's responsibility, but // that requires the connection interface to be byte-oriented rather than // message-oriented (see comment above). - Handshake(context.Context, types.NodeInfo, crypto.PrivKey) (types.NodeInfo, crypto.PubKey, error) + Handshake(context.Context, time.Duration, types.NodeInfo, crypto.PrivKey) (types.NodeInfo, crypto.PubKey, error) // ReceiveMessage returns the next message received on the connection, // blocking until one is available. Returns io.EOF if closed. diff --git a/internal/p2p/transport_mconn.go b/internal/p2p/transport_mconn.go index eca261476..4d18d896b 100644 --- a/internal/p2p/transport_mconn.go +++ b/internal/p2p/transport_mconn.go @@ -9,6 +9,7 @@ import ( "net" "strconv" "sync" + "time" "golang.org/x/net/netutil" @@ -255,6 +256,7 @@ func newMConnConnection( // Handshake implements Connection. func (c *mConnConnection) Handshake( ctx context.Context, + timeout time.Duration, nodeInfo types.NodeInfo, privKey crypto.PrivKey, ) (types.NodeInfo, crypto.PubKey, error) { @@ -264,6 +266,12 @@ func (c *mConnConnection) Handshake( peerKey crypto.PubKey errCh = make(chan error, 1) ) + handshakeCtx := ctx + if timeout > 0 { + var cancel context.CancelFunc + handshakeCtx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } // To handle context cancellation, we need to do the handshake in a // goroutine and abort the blocking network calls by closing the connection // when the context is canceled. @@ -276,12 +284,17 @@ func (c *mConnConnection) Handshake( } }() var err error - mconn, peerInfo, peerKey, err = c.handshake(ctx, nodeInfo, privKey) - errCh <- err + mconn, peerInfo, peerKey, err = c.handshake(handshakeCtx, nodeInfo, privKey) + + select { + case errCh <- err: + case <-handshakeCtx.Done(): + } + }() select { - case <-ctx.Done(): + case <-handshakeCtx.Done(): _ = c.Close() return types.NodeInfo{}, nil, ctx.Err() diff --git a/internal/p2p/transport_memory.go b/internal/p2p/transport_memory.go index 09a387254..f2d1d0c72 100644 --- a/internal/p2p/transport_memory.go +++ b/internal/p2p/transport_memory.go @@ -7,6 +7,7 @@ import ( "io" "net" "sync" + "time" "github.com/tendermint/tendermint/crypto" tmsync "github.com/tendermint/tendermint/internal/libs/sync" @@ -270,9 +271,16 @@ func (c *MemoryConnection) Status() conn.ConnectionStatus { // Handshake implements Connection. func (c *MemoryConnection) Handshake( ctx context.Context, + timeout time.Duration, nodeInfo types.NodeInfo, privKey crypto.PrivKey, ) (types.NodeInfo, crypto.PubKey, error) { + if timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + select { case c.sendCh <- memoryMessage{nodeInfo: &nodeInfo, pubKey: privKey.PubKey()}: c.logger.Debug("sent handshake", "nodeInfo", nodeInfo) diff --git a/internal/p2p/transport_test.go b/internal/p2p/transport_test.go index 1b8ab77f5..de7405177 100644 --- a/internal/p2p/transport_test.go +++ b/internal/p2p/transport_test.go @@ -265,7 +265,7 @@ func TestConnection_Handshake(t *testing.T) { errCh := make(chan error, 1) go func() { // Must use assert due to goroutine. - peerInfo, peerKey, err := ba.Handshake(ctx, bInfo, bKey) + peerInfo, peerKey, err := ba.Handshake(ctx, 0, bInfo, bKey) if err == nil { assert.Equal(t, aInfo, peerInfo) assert.Equal(t, aKey.PubKey(), peerKey) @@ -273,7 +273,7 @@ func TestConnection_Handshake(t *testing.T) { errCh <- err }() - peerInfo, peerKey, err := ab.Handshake(ctx, aInfo, aKey) + peerInfo, peerKey, err := ab.Handshake(ctx, 0, aInfo, aKey) require.NoError(t, err) require.Equal(t, bInfo, peerInfo) require.Equal(t, bKey.PubKey(), peerKey) @@ -291,7 +291,7 @@ func TestConnection_HandshakeCancel(t *testing.T) { ab, ba := dialAccept(t, a, b) timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) cancel() - _, _, err := ab.Handshake(timeoutCtx, types.NodeInfo{}, ed25519.GenPrivKey()) + _, _, err := ab.Handshake(timeoutCtx, 0, types.NodeInfo{}, ed25519.GenPrivKey()) require.Error(t, err) require.Equal(t, context.Canceled, err) _ = ab.Close() @@ -301,7 +301,7 @@ func TestConnection_HandshakeCancel(t *testing.T) { ab, ba = dialAccept(t, a, b) timeoutCtx, cancel = context.WithTimeout(ctx, 200*time.Millisecond) defer cancel() - _, _, err = ab.Handshake(timeoutCtx, types.NodeInfo{}, ed25519.GenPrivKey()) + _, _, err = ab.Handshake(timeoutCtx, 0, types.NodeInfo{}, ed25519.GenPrivKey()) require.Error(t, err) require.Equal(t, context.DeadlineExceeded, err) _ = ab.Close() @@ -630,13 +630,13 @@ func dialAcceptHandshake(t *testing.T, a, b p2p.Transport) (p2p.Connection, p2p. go func() { privKey := ed25519.GenPrivKey() nodeInfo := types.NodeInfo{NodeID: types.NodeIDFromPubKey(privKey.PubKey())} - _, _, err := ba.Handshake(ctx, nodeInfo, privKey) + _, _, err := ba.Handshake(ctx, 0, nodeInfo, privKey) errCh <- err }() privKey := ed25519.GenPrivKey() nodeInfo := types.NodeInfo{NodeID: types.NodeIDFromPubKey(privKey.PubKey())} - _, _, err := ab.Handshake(ctx, nodeInfo, privKey) + _, _, err := ab.Handshake(ctx, 0, nodeInfo, privKey) require.NoError(t, err) timer := time.NewTimer(2 * time.Second) diff --git a/internal/proxy/mocks/app_conn_consensus.go b/internal/proxy/mocks/app_conn_consensus.go index fa93b0931..bff6bf6a4 100644 --- a/internal/proxy/mocks/app_conn_consensus.go +++ b/internal/proxy/mocks/app_conn_consensus.go @@ -150,3 +150,18 @@ func (_m *AppConnConsensus) InitChainSync(_a0 context.Context, _a1 types.Request func (_m *AppConnConsensus) SetResponseCallback(_a0 abciclient.Callback) { _m.Called(_a0) } + +type NewAppConnConsensusT interface { + mock.TestingT + Cleanup(func()) +} + +// NewAppConnConsensus creates a new instance of AppConnConsensus. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewAppConnConsensus(t NewAppConnConsensusT) *AppConnConsensus { + mock := &AppConnConsensus{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/proxy/mocks/app_conn_mempool.go b/internal/proxy/mocks/app_conn_mempool.go index 5429d8f90..a17deb7ca 100644 --- a/internal/proxy/mocks/app_conn_mempool.go +++ b/internal/proxy/mocks/app_conn_mempool.go @@ -118,3 +118,18 @@ func (_m *AppConnMempool) FlushSync(_a0 context.Context) error { func (_m *AppConnMempool) SetResponseCallback(_a0 abciclient.Callback) { _m.Called(_a0) } + +type NewAppConnMempoolT interface { + mock.TestingT + Cleanup(func()) +} + +// NewAppConnMempool creates a new instance of AppConnMempool. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewAppConnMempool(t NewAppConnMempoolT) *AppConnMempool { + mock := &AppConnMempool{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/proxy/mocks/app_conn_query.go b/internal/proxy/mocks/app_conn_query.go index 47ac5bef9..9bde883a0 100644 --- a/internal/proxy/mocks/app_conn_query.go +++ b/internal/proxy/mocks/app_conn_query.go @@ -97,3 +97,18 @@ func (_m *AppConnQuery) QuerySync(_a0 context.Context, _a1 types.RequestQuery) ( return r0, r1 } + +type NewAppConnQueryT interface { + mock.TestingT + Cleanup(func()) +} + +// NewAppConnQuery creates a new instance of AppConnQuery. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewAppConnQuery(t NewAppConnQueryT) *AppConnQuery { + mock := &AppConnQuery{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/proxy/mocks/app_conn_snapshot.go b/internal/proxy/mocks/app_conn_snapshot.go index 0b6f10ce1..36df29781 100644 --- a/internal/proxy/mocks/app_conn_snapshot.go +++ b/internal/proxy/mocks/app_conn_snapshot.go @@ -120,3 +120,18 @@ func (_m *AppConnSnapshot) OfferSnapshotSync(_a0 context.Context, _a1 types.Requ return r0, r1 } + +type NewAppConnSnapshotT interface { + mock.TestingT + Cleanup(func()) +} + +// NewAppConnSnapshot creates a new instance of AppConnSnapshot. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewAppConnSnapshot(t NewAppConnSnapshotT) *AppConnSnapshot { + mock := &AppConnSnapshot{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/state/indexer/mocks/event_sink.go b/internal/state/indexer/mocks/event_sink.go index 98b32e935..ec5e279a8 100644 --- a/internal/state/indexer/mocks/event_sink.go +++ b/internal/state/indexer/mocks/event_sink.go @@ -165,3 +165,18 @@ func (_m *EventSink) Type() indexer.EventSinkType { return r0 } + +type NewEventSinkT interface { + mock.TestingT + Cleanup(func()) +} + +// NewEventSink creates a new instance of EventSink. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewEventSink(t NewEventSinkT) *EventSink { + mock := &EventSink{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/state/mocks/block_store.go b/internal/state/mocks/block_store.go index 563183437..b4d0c7f99 100644 --- a/internal/state/mocks/block_store.go +++ b/internal/state/mocks/block_store.go @@ -208,3 +208,18 @@ func (_m *BlockStore) Size() int64 { return r0 } + +type NewBlockStoreT interface { + mock.TestingT + Cleanup(func()) +} + +// NewBlockStore creates a new instance of BlockStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewBlockStore(t NewBlockStoreT) *BlockStore { + mock := &BlockStore{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/state/mocks/evidence_pool.go b/internal/state/mocks/evidence_pool.go index 8bf4a9b64..8bc208289 100644 --- a/internal/state/mocks/evidence_pool.go +++ b/internal/state/mocks/evidence_pool.go @@ -68,3 +68,18 @@ func (_m *EvidencePool) PendingEvidence(maxBytes int64) ([]types.Evidence, int64 func (_m *EvidencePool) Update(_a0 state.State, _a1 types.EvidenceList) { _m.Called(_a0, _a1) } + +type NewEvidencePoolT interface { + mock.TestingT + Cleanup(func()) +} + +// NewEvidencePool creates a new instance of EvidencePool. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewEvidencePool(t NewEvidencePoolT) *EvidencePool { + mock := &EvidencePool{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/state/mocks/store.go b/internal/state/mocks/store.go index 02c69d3e0..d08ba4c9e 100644 --- a/internal/state/mocks/store.go +++ b/internal/state/mocks/store.go @@ -186,3 +186,18 @@ func (_m *Store) SaveValidatorSets(_a0 int64, _a1 int64, _a2 *types.ValidatorSet return r0 } + +type NewStoreT interface { + mock.TestingT + Cleanup(func()) +} + +// NewStore creates a new instance of Store. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewStore(t NewStoreT) *Store { + mock := &Store{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/statesync/mocks/state_provider.go b/internal/statesync/mocks/state_provider.go index b8d681631..17ddb54ac 100644 --- a/internal/statesync/mocks/state_provider.go +++ b/internal/statesync/mocks/state_provider.go @@ -82,3 +82,18 @@ func (_m *StateProvider) State(ctx context.Context, height uint64) (state.State, return r0, r1 } + +type NewStateProviderT interface { + mock.TestingT + Cleanup(func()) +} + +// NewStateProvider creates a new instance of StateProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewStateProvider(t NewStateProviderT) *StateProvider { + mock := &StateProvider{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/light/client.go b/light/client.go index 5bff76894..32d4c669b 100644 --- a/light/client.go +++ b/light/client.go @@ -1018,7 +1018,12 @@ func (c *Client) findNewPrimary(ctx context.Context, height int64, remove bool) // process all the responses as they come in for i := 0; i < cap(witnessResponsesC); i++ { - response := <-witnessResponsesC + var response witnessResponse + select { + case response = <-witnessResponsesC: + case <-ctx.Done(): + return nil, ctx.Err() + } switch response.err { // success! We have found a new primary case nil: @@ -1047,10 +1052,6 @@ func (c *Client) findNewPrimary(ctx context.Context, height int64, remove bool) // return the light block that new primary responded with return response.lb, nil - // catch canceled contexts or deadlines - case context.Canceled, context.DeadlineExceeded: - return nil, response.err - // process benign errors by logging them only case provider.ErrNoResponse, provider.ErrLightBlockNotFound, provider.ErrHeightTooHigh: lastError = response.err diff --git a/light/provider/mocks/provider.go b/light/provider/mocks/provider.go index aa36fa2d3..9a5789ce9 100644 --- a/light/provider/mocks/provider.go +++ b/light/provider/mocks/provider.go @@ -51,3 +51,18 @@ func (_m *Provider) ReportEvidence(_a0 context.Context, _a1 types.Evidence) erro return r0 } + +type NewProviderT interface { + mock.TestingT + Cleanup(func()) +} + +// NewProvider creates a new instance of Provider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewProvider(t NewProviderT) *Provider { + mock := &Provider{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/light/rpc/mocks/light_client.go b/light/rpc/mocks/light_client.go index cc32cf649..25e101c86 100644 --- a/light/rpc/mocks/light_client.go +++ b/light/rpc/mocks/light_client.go @@ -99,3 +99,18 @@ func (_m *LightClient) VerifyLightBlockAtHeight(ctx context.Context, height int6 return r0, r1 } + +type NewLightClientT interface { + mock.TestingT + Cleanup(func()) +} + +// NewLightClient creates a new instance of LightClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewLightClient(t NewLightClientT) *LightClient { + mock := &LightClient{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/node/node.go b/node/node.go index cd0f31396..e1de314d1 100644 --- a/node/node.go +++ b/node/node.go @@ -1246,7 +1246,9 @@ func createAndStartPrivValidatorGRPCClient( func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOptions { opts := p2p.RouterOptions{ - QueueType: conf.P2P.QueueType, + QueueType: conf.P2P.QueueType, + HandshakeTimeout: conf.P2P.HandshakeTimeout, + DialTimeout: conf.P2P.DialTimeout, } if conf.P2P.MaxNumInboundPeers > 0 { diff --git a/rpc/client/mocks/client.go b/rpc/client/mocks/client.go index 0a83ef201..0f1581f70 100644 --- a/rpc/client/mocks/client.go +++ b/rpc/client/mocks/client.go @@ -800,3 +800,18 @@ func (_m *Client) Validators(ctx context.Context, height *int64, page *int, perP return r0, r1 } + +type NewClientT interface { + mock.TestingT + Cleanup(func()) +} + +// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewClient(t NewClientT) *Client { + mock := &Client{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/scripts/mockery_generate.sh b/scripts/mockery_generate.sh index 382c277bb..2d6f40e63 100755 --- a/scripts/mockery_generate.sh +++ b/scripts/mockery_generate.sh @@ -1,3 +1,15 @@ #!/bin/sh +# +# Invoke Mockery v2 to update generated mocks for the given type. +# +# This script runs a locally-installed "mockery" if available, otherwise it +# runs the published Docker container. This legerdemain is so that the CI build +# and a local build can work off the same script. +# +if ! which mockery ; then + mockery() { + docker run --rm -v "$PWD":/w --workdir=/w vektra/mockery:v2.12.3 + } +fi -go run github.com/vektra/mockery/v2 --disable-version-string --case underscore --name $* +mockery --disable-version-string --case underscore --name "$@" From 486370ac68c05b2996383c25634e43296532558e Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 29 Jun 2022 11:26:28 -0400 Subject: [PATCH 16/21] log: do not pre-process log results (backport #8895) (#8896) (cherry picked from commit 37f9d59969b03d49dc1ff4191b4a5ddac2bc8d13) Co-authored-by: Sam Kleinman --- libs/log/default.go | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/libs/log/default.go b/libs/log/default.go index ca48fcd72..e66043ae7 100644 --- a/libs/log/default.go +++ b/libs/log/default.go @@ -75,7 +75,7 @@ func MustNewDefaultLogger(format, level string, trace bool) Logger { } func (l defaultLogger) Info(msg string, keyVals ...interface{}) { - l.Logger.Info().Fields(getLogFields(keyVals...)).Msg(msg) + l.Logger.Info().Fields(keyVals).Msg(msg) } func (l defaultLogger) Error(msg string, keyVals ...interface{}) { @@ -84,29 +84,16 @@ func (l defaultLogger) Error(msg string, keyVals ...interface{}) { e = e.Stack() } - e.Fields(getLogFields(keyVals...)).Msg(msg) + e.Fields(keyVals).Msg(msg) } func (l defaultLogger) Debug(msg string, keyVals ...interface{}) { - l.Logger.Debug().Fields(getLogFields(keyVals...)).Msg(msg) + l.Logger.Debug().Fields(keyVals).Msg(msg) } func (l defaultLogger) With(keyVals ...interface{}) Logger { return defaultLogger{ - Logger: l.Logger.With().Fields(getLogFields(keyVals...)).Logger(), + Logger: l.Logger.With().Fields(keyVals).Logger(), trace: l.trace, } } - -func getLogFields(keyVals ...interface{}) map[string]interface{} { - if len(keyVals)%2 != 0 { - return nil - } - - fields := make(map[string]interface{}, len(keyVals)) - for i := 0; i < len(keyVals); i += 2 { - fields[fmt.Sprint(keyVals[i])] = keyVals[i+1] - } - - return fields -} From 204281fa666b3ecc5df355748180f03709b0e803 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Wed, 29 Jun 2022 22:12:36 -0400 Subject: [PATCH 17/21] node: always start blocksync and avoid misconfiguration (#8902) --- node/node.go | 15 ++++++--------- node/setup.go | 5 +---- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/node/node.go b/node/node.go index e1de314d1..bea42b338 100644 --- a/node/node.go +++ b/node/node.go @@ -700,10 +700,8 @@ func (n *nodeImpl) OnStart() error { } if n.config.Mode != config.ModeSeed { - if n.config.BlockSync.Version == config.BlockSyncV0 { - if err := n.bcReactor.Start(); err != nil { - return err - } + if err := n.bcReactor.Start(); err != nil { + return err } // Start the real consensus reactor separately since the switch uses the shim. @@ -830,11 +828,10 @@ func (n *nodeImpl) OnStop() { if n.config.Mode != config.ModeSeed { // now stop the reactors - if n.config.BlockSync.Version == config.BlockSyncV0 { - // Stop the real blockchain reactor separately since the switch uses the shim. - if err := n.bcReactor.Stop(); err != nil { - n.Logger.Error("failed to stop the blockchain reactor", "err", err) - } + + // Stop the real blockchain reactor separately since the switch uses the shim. + if err := n.bcReactor.Stop(); err != nil { + n.Logger.Error("failed to stop the blockchain reactor", "err", err) } // Stop the real consensus reactor separately since the switch uses the shim. diff --git a/node/setup.go b/node/setup.go index 26a07e9ef..e94e85c78 100644 --- a/node/setup.go +++ b/node/setup.go @@ -17,7 +17,6 @@ import ( "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto" bcv0 "github.com/tendermint/tendermint/internal/blocksync/v0" - bcv2 "github.com/tendermint/tendermint/internal/blocksync/v2" "github.com/tendermint/tendermint/internal/consensus" "github.com/tendermint/tendermint/internal/evidence" "github.com/tendermint/tendermint/internal/mempool" @@ -746,10 +745,8 @@ func makeNodeInfo( switch cfg.BlockSync.Version { case config.BlockSyncV0: bcChannel = byte(bcv0.BlockSyncChannel) - case config.BlockSyncV2: - bcChannel = bcv2.BlockchainChannel - + return types.NodeInfo{}, fmt.Errorf("unsupported blocksync version %s", cfg.BlockSync.Version) default: return types.NodeInfo{}, fmt.Errorf("unknown blocksync version %s", cfg.BlockSync.Version) } From e2d2c04aacb04b7c78de1854f7d04ca631eff6d6 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 30 Jun 2022 08:33:13 -0700 Subject: [PATCH 18/21] build(deps): Bump github.com/stretchr/testify from 1.7.2 to 1.8.0 (#8908) Bumps [github.com/stretchr/testify](https://github.com/stretchr/testify) from 1.7.2 to 1.8.0. - [Release notes](https://github.com/stretchr/testify/releases) - [Commits](https://github.com/stretchr/testify/compare/v1.7.2...v1.8.0) --- updated-dependencies: - dependency-name: github.com/stretchr/testify dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 282818d4b..5d5fba1ed 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/snikch/goodman v0.0.0-20171125024755-10e37e294daa github.com/spf13/cobra v1.5.0 github.com/spf13/viper v1.12.0 - github.com/stretchr/testify v1.7.2 + github.com/stretchr/testify v1.8.0 github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca github.com/tendermint/tm-db v0.6.6 github.com/vektra/mockery/v2 v2.13.1 diff --git a/go.sum b/go.sum index ba72ebf98..a6dee3ad0 100644 --- a/go.sum +++ b/go.sum @@ -1042,8 +1042,9 @@ github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3 github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/handy v0.0.0-20200128134331-0f66f006fb2e/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v0.0.0-20170130113145-4d4bfba8f1d1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.1.4/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -1053,8 +1054,9 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/subosito/gotenv v1.3.0/go.mod h1:YzJjq/33h7nrwdY+iHMhEOEEbW0ovIz0tB6t6PwAXzs= github.com/subosito/gotenv v1.4.0 h1:yAzM1+SmVcz5R4tXGsNMu1jUl2aOJXoiWUCEwwnGrvs= From 01984cb3b205c50ce5f1c9e3310a1e0aba09e82b Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 30 Jun 2022 17:15:32 -0400 Subject: [PATCH 19/21] p2p: set outgoing connections to around 20% of total connections (#8913) (#8914) (cherry picked from commit 47cb30fc1d647183d500b0ecc31248df5dfc678c) Co-authored-by: William Banfield <4561443+williambanfield@users.noreply.github.com> --- config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/config.go b/config/config.go index bebbf8d50..c4182fe48 100644 --- a/config/config.go +++ b/config/config.go @@ -778,7 +778,7 @@ func DefaultP2PConfig() *P2PConfig { MaxNumInboundPeers: 40, MaxNumOutboundPeers: 10, MaxConnections: 64, - MaxOutgoingConnections: 32, + MaxOutgoingConnections: 12, MaxIncomingConnectionAttempts: 100, PersistentPeersMaxDialPeriod: 0 * time.Second, FlushThrottleTimeout: 100 * time.Millisecond, From 6a646f366e1a6c5dc902cc0cfbbb5eb484b0f20b Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 1 Jul 2022 12:15:22 -0400 Subject: [PATCH 20/21] build(deps): Bump github.com/vektra/mockery/v2 from 2.13.1 to 2.14.0 (#8925) Bumps [github.com/vektra/mockery/v2](https://github.com/vektra/mockery) from 2.13.1 to 2.14.0. - [Release notes](https://github.com/vektra/mockery/releases) - [Changelog](https://github.com/vektra/mockery/blob/master/.goreleaser.yml) - [Commits](https://github.com/vektra/mockery/compare/v2.13.1...v2.14.0) --- updated-dependencies: - dependency-name: github.com/vektra/mockery/v2 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 5d5fba1ed..60dac969d 100644 --- a/go.mod +++ b/go.mod @@ -44,7 +44,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca github.com/tendermint/tm-db v0.6.6 - github.com/vektra/mockery/v2 v2.13.1 + github.com/vektra/mockery/v2 v2.14.0 golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e golang.org/x/net v0.0.0-20220617184016-355a448f1bc9 golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 diff --git a/go.sum b/go.sum index a6dee3ad0..d340e335c 100644 --- a/go.sum +++ b/go.sum @@ -1106,8 +1106,8 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC github.com/valyala/fasthttp v1.30.0/go.mod h1:2rsYD01CKFrjjsvFxx75KlEUNpWNBY9JWD3K/7o2Cus= github.com/valyala/quicktemplate v1.7.0/go.mod h1:sqKJnoaOF88V07vkO+9FL8fb9uZg/VPSJnLYn+LmLk8= github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= -github.com/vektra/mockery/v2 v2.13.1 h1:Lqs7aZiC7TwZO76fJ/4Zsb3NaO4F7cuuz0mZLYeNwtQ= -github.com/vektra/mockery/v2 v2.13.1/go.mod h1:bnD1T8tExSgPD1ripLkDbr60JA9VtQeu12P3wgLZd7M= +github.com/vektra/mockery/v2 v2.14.0 h1:KZ1p5Hrn8tiY+LErRMr14HHle6khxo+JKOXLBW/yfqs= +github.com/vektra/mockery/v2 v2.14.0/go.mod h1:bnD1T8tExSgPD1ripLkDbr60JA9VtQeu12P3wgLZd7M= github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8/go.mod h1:dniwbG03GafCjFohMDmz6Zc6oCuiqgH6tGNyXTkHzXE= github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= From e414d0a87817229d4ab0c9cfb711d1278f7c14ee Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 5 Jul 2022 12:19:03 +0200 Subject: [PATCH 21/21] build(deps): Bump github.com/libp2p/go-buffer-pool from 0.0.2 to 0.1.0 (#8931) --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 60dac969d..66ce029a0 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/lib/pq v1.10.6 - github.com/libp2p/go-buffer-pool v0.0.2 + github.com/libp2p/go-buffer-pool v0.1.0 github.com/minio/highwayhash v1.0.2 github.com/mroth/weightedrand v0.4.1 github.com/oasisprotocol/curve25519-voi v0.0.0-20210609091139-0a56a4bca00b diff --git a/go.sum b/go.sum index d340e335c..7afca0a73 100644 --- a/go.sum +++ b/go.sum @@ -696,8 +696,8 @@ github.com/lib/pq v1.9.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.4/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.6 h1:jbk+ZieJ0D7EVGJYpL9QTz7/YW6UHbmdnZWYyK5cdBs= github.com/lib/pq v1.10.6/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs= -github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= +github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= +github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= github.com/lufeee/execinquery v1.0.0 h1:1XUTuLIVPDlFvUU3LXmmZwHDsolsxXnY67lzhpeqe0I= github.com/lufeee/execinquery v1.0.0/go.mod h1:EC7DrEKView09ocscGHC+apXMIaorh4xqSxS/dy8SbM= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=