mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-03 02:22:04 +00:00
Compare commits
14 Commits
wb/fix-new
...
wb/p2p-tes
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
eaf821b35f | ||
|
|
dabb0f0d40 | ||
|
|
c4ef566071 | ||
|
|
f19e52e6f2 | ||
|
|
19b98c7005 | ||
|
|
826f224c2d | ||
|
|
2df4c2b19d | ||
|
|
6f4ef72964 | ||
|
|
3398f37979 | ||
|
|
8ef63fe3d9 | ||
|
|
9daea43375 | ||
|
|
df9363c67c | ||
|
|
24701cd587 | ||
|
|
e9c87a3c49 |
2
.github/workflows/janitor.yml
vendored
2
.github/workflows/janitor.yml
vendored
@@ -10,7 +10,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 3
|
||||
steps:
|
||||
- uses: styfle/cancel-workflow-action@0.9.1
|
||||
- uses: styfle/cancel-workflow-action@0.10.0
|
||||
with:
|
||||
workflow_id: 1041851,1401230,2837803
|
||||
access_token: ${{ github.token }}
|
||||
|
||||
20
CHANGELOG.md
20
CHANGELOG.md
@@ -2,9 +2,29 @@
|
||||
|
||||
Friendly reminder: We have a [bug bounty program](https://hackerone.com/cosmos).
|
||||
|
||||
## v0.35.7
|
||||
|
||||
June 16, 2022
|
||||
|
||||
### BUG FIXES
|
||||
|
||||
- [p2p] [\#8692](https://github.com/tendermint/tendermint/pull/8692) scale the number of stored peers by the configured maximum connections (#8684)
|
||||
- [rpc] [\#8715](https://github.com/tendermint/tendermint/pull/8715) always close http bodies (backport #8712)
|
||||
- [p2p] [\#8760](https://github.com/tendermint/tendermint/pull/8760) accept should not abort on first error (backport #8759)
|
||||
|
||||
### BREAKING CHANGES
|
||||
|
||||
- P2P Protocol
|
||||
|
||||
- [p2p] [\#8737](https://github.com/tendermint/tendermint/pull/8737) Introduce "inactive" peer label to avoid re-dialing incompatible peers. (@tychoish)
|
||||
- [p2p] [\#8737](https://github.com/tendermint/tendermint/pull/8737) Increase frequency of dialing attempts to reduce latency for peer acquisition. (@tychoish)
|
||||
- [p2p] [\#8737](https://github.com/tendermint/tendermint/pull/8737) Improvements to peer scoring and sorting to gossip a greater variety of peers during PEX. (@tychoish)
|
||||
- [p2p] [\#8737](https://github.com/tendermint/tendermint/pull/8737) Track incoming and outgoing peers separately to ensure more peer slots open for incoming connections. (@tychoish)
|
||||
|
||||
## v0.35.6
|
||||
|
||||
June 3, 2022
|
||||
|
||||
### FEATURES
|
||||
|
||||
- [migrate] [\#8672](https://github.com/tendermint/tendermint/pull/8672) provide function for database production (backport #8614) (@tychoish)
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
Friendly reminder: We have a [bug bounty program](https://hackerone.com/cosmos).
|
||||
|
||||
## v0.35.7
|
||||
## v0.35.8
|
||||
|
||||
Month DD, YYYY
|
||||
|
||||
@@ -22,6 +22,8 @@ Special thanks to external contributors on this release:
|
||||
|
||||
### FEATURES
|
||||
|
||||
- [cli] [\#8675] Add command to force compact goleveldb databases (@cmwaters)
|
||||
|
||||
### IMPROVEMENTS
|
||||
|
||||
### BUG FIXES
|
||||
|
||||
69
cmd/tendermint/commands/compact.go
Normal file
69
cmd/tendermint/commands/compact.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package commands
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
"github.com/syndtr/goleveldb/leveldb/util"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
)
|
||||
|
||||
func MakeCompactDBCommand() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "experimental-compact-goleveldb",
|
||||
Short: "force compacts the tendermint storage engine (only GoLevelDB supported)",
|
||||
Long: `
|
||||
This is a temporary utility command that performs a force compaction on the state
|
||||
and blockstores to reduce disk space for a pruning node. This should only be run
|
||||
once the node has stopped. This command will likely be omitted in the future after
|
||||
the planned refactor to the storage engine.
|
||||
|
||||
Currently, only GoLevelDB is supported.
|
||||
`,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
if config.DBBackend != "goleveldb" {
|
||||
return errors.New("compaction is currently only supported with goleveldb")
|
||||
}
|
||||
|
||||
compactGoLevelDBs(config.RootDir, logger)
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func compactGoLevelDBs(rootDir string, logger log.Logger) {
|
||||
dbNames := []string{"state", "blockstore"}
|
||||
o := &opt.Options{
|
||||
DisableSeeksCompaction: true,
|
||||
}
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
for _, dbName := range dbNames {
|
||||
dbName := dbName
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
dbPath := filepath.Join(rootDir, "data", dbName+".db")
|
||||
store, err := leveldb.OpenFile(dbPath, o)
|
||||
if err != nil {
|
||||
logger.Error("failed to initialize tendermint db", "path", dbPath, "err", err)
|
||||
return
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
logger.Info("starting compaction...", "db", dbPath)
|
||||
|
||||
err = store.CompactRange(util.Range{Start: nil, Limit: nil})
|
||||
if err != nil {
|
||||
logger.Error("failed to compact tendermint db", "path", dbPath, "err", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
@@ -32,6 +32,7 @@ func main() {
|
||||
cmd.InspectCmd,
|
||||
cmd.RollbackStateCmd,
|
||||
cmd.MakeKeyMigrateCommand(),
|
||||
cmd.MakeCompactDBCommand(),
|
||||
debug.DebugCmd,
|
||||
cli.NewCompletionCmd(rootCmd, true),
|
||||
)
|
||||
|
||||
1
go.mod
1
go.mod
@@ -42,6 +42,7 @@ require (
|
||||
github.com/spf13/cobra v1.5.0
|
||||
github.com/spf13/viper v1.12.0
|
||||
github.com/stretchr/testify v1.7.2
|
||||
github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca
|
||||
github.com/tendermint/tm-db v0.6.6
|
||||
github.com/vektra/mockery/v2 v2.13.1
|
||||
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e
|
||||
|
||||
@@ -816,6 +816,7 @@ func (cs *State) receiveRoutine(maxSteps int) {
|
||||
cs.handleTxsAvailable()
|
||||
|
||||
case mi = <-cs.peerMsgQueue:
|
||||
fmt.Println("got message")
|
||||
if err := cs.wal.Write(mi); err != nil {
|
||||
cs.Logger.Error("failed writing to WAL", "err", err)
|
||||
}
|
||||
@@ -915,9 +916,12 @@ func (cs *State) handleMsg(mi msgInfo) {
|
||||
case *VoteMessage:
|
||||
// attempt to add the vote and dupeout the validator if its a duplicate signature
|
||||
// if the vote gives us a 2/3-any or 2/3-one, we transition
|
||||
fmt.Println("adding vote")
|
||||
added, err = cs.tryAddVote(msg.Vote, peerID)
|
||||
if added {
|
||||
fmt.Println("adding stats")
|
||||
cs.statsMsgQueue <- mi
|
||||
fmt.Println("added stats")
|
||||
}
|
||||
|
||||
// if err == ErrAddingVote {
|
||||
|
||||
@@ -380,6 +380,50 @@ func TestStateFullRoundNil(t *testing.T) {
|
||||
validatePrevoteAndPrecommit(t, cs, round, -1, vss[0], nil, nil)
|
||||
}
|
||||
|
||||
// run through propose, prevote, precommit commit with two validators
|
||||
// where the first validator has to wait for votes from the second
|
||||
func TestStateTooManyVotes(t *testing.T) {
|
||||
config := configSetup(t)
|
||||
config.Consensus.TimeoutCommit = 25 * time.Second
|
||||
|
||||
cs1, vss, err := randState(config, 10000)
|
||||
require.NoError(t, err)
|
||||
// vs1, vs2, vs3 := vss[0], vss[1], vss[2]
|
||||
vs1 := vss[0]
|
||||
height, round := cs1.Height, cs1.Round
|
||||
|
||||
voteCh := subscribe(cs1.eventBus, types.EventQueryVote)
|
||||
newBlockCh := subscribe(cs1.eventBus, types.EventQueryNewBlock)
|
||||
|
||||
// start round and wait for propose and prevote
|
||||
startTestRound(cs1, height, round)
|
||||
|
||||
ensurePrevote(voteCh, height, round) // prevote
|
||||
|
||||
// we should be stuck in limbo waiting for more prevotes
|
||||
rs := cs1.GetRoundState()
|
||||
propBlockHash, propPartSetHeader := rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header()
|
||||
|
||||
// prevote arrives from vs2:
|
||||
signAddVotes(config, cs1, tmproto.PrevoteType, propBlockHash, propPartSetHeader, vss[1:]...)
|
||||
ensurePrevote(voteCh, height, round) // prevote
|
||||
|
||||
ensurePrecommit(voteCh, height, round) // precommit
|
||||
// the proposed block should now be locked and our precommit added
|
||||
validatePrecommit(t, cs1, 0, 0, vss[0], propBlockHash, propBlockHash)
|
||||
|
||||
// we should be stuck in limbo waiting for more precommits
|
||||
|
||||
// precommit arrives from vs2:
|
||||
for i := 0; i < 10000; i++ {
|
||||
signAddVotes(config, cs1, tmproto.PrecommitType, propBlockHash, propPartSetHeader, vs1)
|
||||
}
|
||||
ensurePrecommit(voteCh, height, round)
|
||||
|
||||
// wait to finish commit, propose in next height
|
||||
ensureNewBlock(newBlockCh, height)
|
||||
}
|
||||
|
||||
// run through propose, prevote, precommit commit with two validators
|
||||
// where the first validator has to wait for votes from the second
|
||||
func TestStateFullRound2(t *testing.T) {
|
||||
|
||||
@@ -53,6 +53,9 @@ type Metrics struct {
|
||||
// this node.
|
||||
PeersConnectedIncoming metrics.Gauge
|
||||
|
||||
// Number of peers evicted by this node.
|
||||
PeersEvicted metrics.Counter
|
||||
|
||||
// RouterPeerQueueRecv defines the time taken to read off of a peer's queue
|
||||
// before sending on the connection.
|
||||
RouterPeerQueueRecv metrics.Histogram
|
||||
@@ -110,6 +113,12 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
|
||||
Name: "peers_connected_success",
|
||||
Help: "Number of successful peer connection attempts",
|
||||
}, labels).With(labelsAndValues...),
|
||||
PeersEvicted: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "peers_evicted",
|
||||
Help: "Number of connected peers evicted",
|
||||
}, labels).With(labelsAndValues...),
|
||||
PeersConnectedFailure: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
@@ -200,6 +209,7 @@ func NopMetrics() *Metrics {
|
||||
PeersConnectedIncoming: discard.NewGauge(),
|
||||
PeersConnectedOutgoing: discard.NewGauge(),
|
||||
PeersInactivated: discard.NewGauge(),
|
||||
PeersEvicted: discard.NewCounter(),
|
||||
PeerReceiveBytesTotal: discard.NewCounter(),
|
||||
PeerSendBytesTotal: discard.NewCounter(),
|
||||
PeerPendingSendBytes: discard.NewGauge(),
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package p2ptest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -254,7 +253,7 @@ func (n *Network) MakeNode(t *testing.T, opts NodeOptions) *Node {
|
||||
privKey,
|
||||
peerManager,
|
||||
[]p2p.Transport{transport},
|
||||
p2p.RouterOptions{DialSleep: func(_ context.Context) {}},
|
||||
p2p.RouterOptions{},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, router.Start())
|
||||
|
||||
@@ -162,6 +162,10 @@ type PeerManagerOptions struct {
|
||||
// retry times, to avoid thundering herds. 0 disables jitter.
|
||||
RetryTimeJitter time.Duration
|
||||
|
||||
// DisconnectCooldownPeriod is the amount of time after we
|
||||
// disconnect from a peer before we'll consider dialing a new peer
|
||||
DisconnectCooldownPeriod time.Duration
|
||||
|
||||
// PeerScores sets fixed scores for specific peers. It is mainly used
|
||||
// for testing. A score of 0 is ignored.
|
||||
PeerScores map[types.NodeID]PeerScore
|
||||
@@ -534,12 +538,13 @@ func (m *PeerManager) HasDialedMaxPeers() bool {
|
||||
// returned peer.
|
||||
func (m *PeerManager) DialNext(ctx context.Context) (NodeAddress, error) {
|
||||
for {
|
||||
address, err := m.TryDialNext()
|
||||
if err != nil || (address != NodeAddress{}) {
|
||||
return address, err
|
||||
if address := m.TryDialNext(); (address != NodeAddress{}) {
|
||||
return address, nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-m.dialWaker.Sleep():
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
return NodeAddress{}, ctx.Err()
|
||||
}
|
||||
@@ -548,21 +553,20 @@ func (m *PeerManager) DialNext(ctx context.Context) (NodeAddress, error) {
|
||||
|
||||
// TryDialNext is equivalent to DialNext(), but immediately returns an empty
|
||||
// address if no peers or connection slots are available.
|
||||
func (m *PeerManager) TryDialNext() (NodeAddress, error) {
|
||||
func (m *PeerManager) TryDialNext() NodeAddress {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
// We allow dialing MaxConnected+MaxConnectedUpgrade peers. Including
|
||||
// MaxConnectedUpgrade allows us to probe additional peers that have a
|
||||
// higher score than any other peers, and if successful evict it.
|
||||
if m.options.MaxConnected > 0 && len(m.connected)+len(m.dialing) >=
|
||||
int(m.options.MaxConnected)+int(m.options.MaxConnectedUpgrade) {
|
||||
return NodeAddress{}, nil
|
||||
if m.options.MaxConnected > 0 && len(m.connected)+len(m.dialing) >= int(m.options.MaxConnected)+int(m.options.MaxConnectedUpgrade) {
|
||||
return NodeAddress{}
|
||||
}
|
||||
|
||||
cinfo := m.getConnectedInfo()
|
||||
if m.options.MaxOutgoingConnections > 0 && cinfo.outgoing >= m.options.MaxOutgoingConnections {
|
||||
return NodeAddress{}, nil
|
||||
return NodeAddress{}
|
||||
}
|
||||
|
||||
for _, peer := range m.store.Ranked() {
|
||||
@@ -570,11 +574,19 @@ func (m *PeerManager) TryDialNext() (NodeAddress, error) {
|
||||
continue
|
||||
}
|
||||
|
||||
if !peer.LastDisconnected.IsZero() && time.Since(peer.LastDisconnected) < m.options.DisconnectCooldownPeriod {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, addressInfo := range peer.AddressInfo {
|
||||
if time.Since(addressInfo.LastDialFailure) < m.retryDelay(addressInfo.DialFailures, peer.Persistent) {
|
||||
continue
|
||||
}
|
||||
|
||||
if id, ok := m.store.Resolve(addressInfo.Address); ok && (m.isConnected(id) || m.dialing[id]) {
|
||||
continue
|
||||
}
|
||||
|
||||
// We now have an eligible address to dial. If we're full but have
|
||||
// upgrade capacity (as checked above), we find a lower-scored peer
|
||||
// we can replace and mark it as upgrading so noone else claims it.
|
||||
@@ -585,16 +597,16 @@ func (m *PeerManager) TryDialNext() (NodeAddress, error) {
|
||||
if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) {
|
||||
upgradeFromPeer := m.findUpgradeCandidate(peer.ID, peer.Score())
|
||||
if upgradeFromPeer == "" {
|
||||
return NodeAddress{}, nil
|
||||
return NodeAddress{}
|
||||
}
|
||||
m.upgrading[upgradeFromPeer] = peer.ID
|
||||
}
|
||||
|
||||
m.dialing[peer.ID] = true
|
||||
return addressInfo.Address, nil
|
||||
return addressInfo.Address
|
||||
}
|
||||
}
|
||||
return NodeAddress{}, nil
|
||||
return NodeAddress{}
|
||||
}
|
||||
|
||||
// DialFailed reports a failed dial attempt. This will make the peer available
|
||||
@@ -702,8 +714,7 @@ func (m *PeerManager) Dialed(address NodeAddress) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if upgradeFromPeer != "" && m.options.MaxConnected > 0 &&
|
||||
len(m.connected) >= int(m.options.MaxConnected) {
|
||||
if upgradeFromPeer != "" && m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) {
|
||||
// Look for an even lower-scored peer that may have appeared since we
|
||||
// started the upgrade.
|
||||
if p, ok := m.store.Get(upgradeFromPeer); ok {
|
||||
@@ -712,11 +723,11 @@ func (m *PeerManager) Dialed(address NodeAddress) error {
|
||||
}
|
||||
}
|
||||
m.evict[upgradeFromPeer] = true
|
||||
m.evictWaker.Wake()
|
||||
}
|
||||
|
||||
m.metrics.PeersConnectedOutgoing.Add(1)
|
||||
m.connected[peer.ID] = peerConnectionOutgoing
|
||||
m.evictWaker.Wake()
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -885,6 +896,22 @@ func (m *PeerManager) Disconnected(peerID types.NodeID) {
|
||||
delete(m.evicting, peerID)
|
||||
delete(m.ready, peerID)
|
||||
|
||||
if peer, ok := m.store.Get(peerID); ok {
|
||||
peer.LastDisconnected = time.Now()
|
||||
_ = m.store.Set(peer)
|
||||
// launch a thread to ping the dialWaker when the
|
||||
// disconnected peer can be dialed again.
|
||||
go func() {
|
||||
timer := time.NewTimer(m.options.DisconnectCooldownPeriod)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-timer.C:
|
||||
m.dialWaker.Wake()
|
||||
case <-m.closeCh:
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
if ready {
|
||||
m.broadcast(PeerUpdate{
|
||||
NodeID: peerID,
|
||||
@@ -1263,6 +1290,7 @@ func (m *PeerManager) retryDelay(failures uint32, persistent bool) time.Duration
|
||||
type peerStore struct {
|
||||
db dbm.DB
|
||||
peers map[types.NodeID]*peerInfo
|
||||
index map[NodeAddress]types.NodeID
|
||||
ranked []*peerInfo // cache for Ranked(), nil invalidates cache
|
||||
}
|
||||
|
||||
@@ -1282,6 +1310,7 @@ func newPeerStore(db dbm.DB) (*peerStore, error) {
|
||||
// loadPeers loads all peers from the database into memory.
|
||||
func (s *peerStore) loadPeers() error {
|
||||
peers := map[types.NodeID]*peerInfo{}
|
||||
addrs := map[NodeAddress]types.NodeID{}
|
||||
|
||||
start, end := keyPeerInfoRange()
|
||||
iter, err := s.db.Iterator(start, end)
|
||||
@@ -1301,11 +1330,18 @@ func (s *peerStore) loadPeers() error {
|
||||
return fmt.Errorf("invalid peer data: %w", err)
|
||||
}
|
||||
peers[peer.ID] = peer
|
||||
for addr := range peer.AddressInfo {
|
||||
// TODO maybe check to see if we've seen this
|
||||
// addr before for a different peer, there
|
||||
// could be duplicates.
|
||||
addrs[addr] = peer.ID
|
||||
}
|
||||
}
|
||||
if iter.Error() != nil {
|
||||
return iter.Error()
|
||||
}
|
||||
s.peers = peers
|
||||
s.index = addrs
|
||||
s.ranked = nil // invalidate cache if populated
|
||||
return nil
|
||||
}
|
||||
@@ -1317,6 +1353,12 @@ func (s *peerStore) Get(id types.NodeID) (peerInfo, bool) {
|
||||
return peer.Copy(), ok
|
||||
}
|
||||
|
||||
// Resolve returns the peer ID for a given node address if known.
|
||||
func (s *peerStore) Resolve(addr NodeAddress) (types.NodeID, bool) {
|
||||
id, ok := s.index[addr]
|
||||
return id, ok
|
||||
}
|
||||
|
||||
// Set stores peer data. The input data will be copied, and can safely be reused
|
||||
// by the caller.
|
||||
func (s *peerStore) Set(peer peerInfo) error {
|
||||
@@ -1345,20 +1387,29 @@ func (s *peerStore) Set(peer peerInfo) error {
|
||||
// update the existing pointer address.
|
||||
*current = peer
|
||||
}
|
||||
for addr := range peer.AddressInfo {
|
||||
s.index[addr] = peer.ID
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete deletes a peer, or does nothing if it does not exist.
|
||||
func (s *peerStore) Delete(id types.NodeID) error {
|
||||
if _, ok := s.peers[id]; !ok {
|
||||
peer, ok := s.peers[id]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if err := s.db.Delete(keyPeerInfo(id)); err != nil {
|
||||
return err
|
||||
for _, addr := range peer.AddressInfo {
|
||||
delete(s.index, addr.Address)
|
||||
}
|
||||
delete(s.peers, id)
|
||||
s.ranked = nil
|
||||
|
||||
if err := s.db.Delete(keyPeerInfo(id)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1447,9 +1498,10 @@ func (s *peerStore) Size() int {
|
||||
|
||||
// peerInfo contains peer information stored in a peerStore.
|
||||
type peerInfo struct {
|
||||
ID types.NodeID
|
||||
AddressInfo map[NodeAddress]*peerAddressInfo
|
||||
LastConnected time.Time
|
||||
ID types.NodeID
|
||||
AddressInfo map[NodeAddress]*peerAddressInfo
|
||||
LastConnected time.Time
|
||||
LastDisconnected time.Time
|
||||
|
||||
// These fields are ephemeral, i.e. not persisted to the database.
|
||||
Persistent bool
|
||||
@@ -1489,8 +1541,8 @@ func peerInfoFromProto(msg *p2pproto.PeerInfo) (*peerInfo, error) {
|
||||
func (p *peerInfo) ToProto() *p2pproto.PeerInfo {
|
||||
msg := &p2pproto.PeerInfo{
|
||||
ID: string(p.ID),
|
||||
LastConnected: &p.LastConnected,
|
||||
Inactive: p.Inactive,
|
||||
LastConnected: &p.LastConnected,
|
||||
}
|
||||
for _, addressInfo := range p.AddressInfo {
|
||||
msg.AddressInfo = append(msg.AddressInfo, addressInfo.ToProto())
|
||||
@@ -1498,6 +1550,7 @@ func (p *peerInfo) ToProto() *p2pproto.PeerInfo {
|
||||
if msg.LastConnected.IsZero() {
|
||||
msg.LastConnected = nil
|
||||
}
|
||||
|
||||
return msg
|
||||
}
|
||||
|
||||
|
||||
@@ -378,16 +378,14 @@ func TestPeerManager_DialNext_WakeOnDialFailed(t *testing.T) {
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial := peerManager.TryDialNext()
|
||||
require.Equal(t, a, dial)
|
||||
|
||||
// Add b. We shouldn't be able to dial it, due to MaxConnected.
|
||||
added, err = peerManager.Add(b)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Zero(t, dial)
|
||||
|
||||
// Spawn a goroutine to fail a's dial attempt.
|
||||
@@ -415,8 +413,7 @@ func TestPeerManager_DialNext_WakeOnDialFailedRetry(t *testing.T) {
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial := peerManager.TryDialNext()
|
||||
require.Equal(t, a, dial)
|
||||
require.NoError(t, peerManager.DialFailed(dial))
|
||||
failed := time.Now()
|
||||
@@ -443,8 +440,7 @@ func TestPeerManager_DialNext_WakeOnDisconnected(t *testing.T) {
|
||||
err = peerManager.Accepted(a.NodeID)
|
||||
require.NoError(t, err)
|
||||
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial := peerManager.TryDialNext()
|
||||
require.Zero(t, dial)
|
||||
|
||||
go func() {
|
||||
@@ -473,8 +469,7 @@ func TestPeerManager_TryDialNext_MaxConnected(t *testing.T) {
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial := peerManager.TryDialNext()
|
||||
require.Equal(t, a, dial)
|
||||
require.NoError(t, peerManager.Dialed(a))
|
||||
|
||||
@@ -482,16 +477,14 @@ func TestPeerManager_TryDialNext_MaxConnected(t *testing.T) {
|
||||
added, err = peerManager.Add(b)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Equal(t, b, dial)
|
||||
|
||||
// At this point, adding c will not allow dialing it.
|
||||
added, err = peerManager.Add(c)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Zero(t, dial)
|
||||
}
|
||||
|
||||
@@ -520,7 +513,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err := peerManager.TryDialNext()
|
||||
dial := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, a, dial)
|
||||
require.NoError(t, peerManager.Dialed(a))
|
||||
@@ -529,8 +522,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
|
||||
added, err = peerManager.Add(b)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Equal(t, b, dial)
|
||||
|
||||
// Even though we are at capacity, we should be allowed to dial c for an
|
||||
@@ -538,8 +530,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
|
||||
added, err = peerManager.Add(c)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Equal(t, c, dial)
|
||||
|
||||
// However, since we're using all upgrade slots now, we can't add and dial
|
||||
@@ -547,16 +538,14 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
|
||||
added, err = peerManager.Add(d)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Zero(t, dial)
|
||||
|
||||
// We go through with c's upgrade.
|
||||
require.NoError(t, peerManager.Dialed(c))
|
||||
|
||||
// Still can't dial d.
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Zero(t, dial)
|
||||
|
||||
// Now, if we disconnect a, we should be allowed to dial d because we have a
|
||||
@@ -572,8 +561,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
|
||||
added, err = peerManager.Add(e)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Zero(t, dial)
|
||||
}
|
||||
|
||||
@@ -593,8 +581,7 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) {
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial := peerManager.TryDialNext()
|
||||
require.Equal(t, a, dial)
|
||||
require.NoError(t, peerManager.Dialed(a))
|
||||
|
||||
@@ -602,8 +589,7 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) {
|
||||
added, err = peerManager.Add(b)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Equal(t, b, dial)
|
||||
|
||||
// Adding c and dialing it will fail, because a is the only connected
|
||||
@@ -611,8 +597,7 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) {
|
||||
added, err = peerManager.Add(c)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Empty(t, dial)
|
||||
}
|
||||
|
||||
@@ -633,22 +618,19 @@ func TestPeerManager_TryDialNext_DialingConnected(t *testing.T) {
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial := peerManager.TryDialNext()
|
||||
require.Equal(t, a, dial)
|
||||
|
||||
// Adding a's TCP address will not dispense a, since it's already dialing.
|
||||
added, err = peerManager.Add(aTCP)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Zero(t, dial)
|
||||
|
||||
// Marking a as dialed will still not dispense it.
|
||||
require.NoError(t, peerManager.Dialed(a))
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Zero(t, dial)
|
||||
|
||||
// Adding b and accepting a connection from it will not dispense it either.
|
||||
@@ -656,8 +638,7 @@ func TestPeerManager_TryDialNext_DialingConnected(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
require.NoError(t, peerManager.Accepted(bID))
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Zero(t, dial)
|
||||
}
|
||||
|
||||
@@ -683,16 +664,14 @@ func TestPeerManager_TryDialNext_Multiple(t *testing.T) {
|
||||
// All addresses should be dispensed as long as dialing them has failed.
|
||||
dial := []p2p.NodeAddress{}
|
||||
for range addresses {
|
||||
address, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
address := peerManager.TryDialNext()
|
||||
require.NotZero(t, address)
|
||||
require.NoError(t, peerManager.DialFailed(address))
|
||||
dial = append(dial, address)
|
||||
}
|
||||
require.ElementsMatch(t, dial, addresses)
|
||||
|
||||
address, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
address := peerManager.TryDialNext()
|
||||
require.Zero(t, address)
|
||||
}
|
||||
|
||||
@@ -714,15 +693,14 @@ func TestPeerManager_DialFailed(t *testing.T) {
|
||||
// Dialing and then calling DialFailed with a different address (same
|
||||
// NodeID) should unmark as dialing and allow us to dial the other address
|
||||
// again, but not register the failed address.
|
||||
dial, err := peerManager.TryDialNext()
|
||||
dial := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, a, dial)
|
||||
require.NoError(t, peerManager.DialFailed(p2p.NodeAddress{
|
||||
Protocol: "tcp", NodeID: aID, Hostname: "localhost"}))
|
||||
require.Equal(t, []p2p.NodeAddress{a}, peerManager.Addresses(aID))
|
||||
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Equal(t, a, dial)
|
||||
|
||||
// Calling DialFailed on same address twice should be fine.
|
||||
@@ -753,8 +731,7 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) {
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial := peerManager.TryDialNext()
|
||||
require.Equal(t, a, dial)
|
||||
require.NoError(t, peerManager.Dialed(a))
|
||||
|
||||
@@ -762,8 +739,7 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) {
|
||||
added, err = peerManager.Add(b)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Equal(t, b, dial)
|
||||
|
||||
// Adding c and dialing it will fail, even though it could upgrade a and we
|
||||
@@ -772,14 +748,12 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) {
|
||||
added, err = peerManager.Add(c)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Empty(t, dial)
|
||||
|
||||
// Failing b's dial will now make c available for dialing.
|
||||
require.NoError(t, peerManager.DialFailed(b))
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Equal(t, c, dial)
|
||||
}
|
||||
|
||||
@@ -794,8 +768,7 @@ func TestPeerManager_Dialed_Connected(t *testing.T) {
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial := peerManager.TryDialNext()
|
||||
require.Equal(t, a, dial)
|
||||
|
||||
require.NoError(t, peerManager.Dialed(a))
|
||||
@@ -805,8 +778,7 @@ func TestPeerManager_Dialed_Connected(t *testing.T) {
|
||||
added, err = peerManager.Add(b)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Equal(t, b, dial)
|
||||
|
||||
require.NoError(t, peerManager.Accepted(b.NodeID))
|
||||
@@ -835,8 +807,7 @@ func TestPeerManager_Dialed_MaxConnected(t *testing.T) {
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial := peerManager.TryDialNext()
|
||||
require.Equal(t, a, dial)
|
||||
|
||||
// Marking b as dialed in the meanwhile (even without TryDialNext)
|
||||
@@ -878,8 +849,7 @@ func TestPeerManager_Dialed_MaxConnectedUpgrade(t *testing.T) {
|
||||
added, err = peerManager.Add(c)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial := peerManager.TryDialNext()
|
||||
require.Equal(t, c, dial)
|
||||
require.NoError(t, peerManager.Dialed(c))
|
||||
|
||||
@@ -923,8 +893,7 @@ func TestPeerManager_Dialed_Upgrade(t *testing.T) {
|
||||
added, err = peerManager.Add(b)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial := peerManager.TryDialNext()
|
||||
require.Equal(t, b, dial)
|
||||
require.NoError(t, peerManager.Dialed(b))
|
||||
|
||||
@@ -933,8 +902,7 @@ func TestPeerManager_Dialed_Upgrade(t *testing.T) {
|
||||
added, err = peerManager.Add(c)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Empty(t, dial)
|
||||
|
||||
// a should now be evicted.
|
||||
@@ -977,8 +945,7 @@ func TestPeerManager_Dialed_UpgradeEvenLower(t *testing.T) {
|
||||
added, err = peerManager.Add(c)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial := peerManager.TryDialNext()
|
||||
require.Equal(t, c, dial)
|
||||
|
||||
// In the meanwhile, a disconnects and d connects. d is even lower-scored
|
||||
@@ -1028,7 +995,7 @@ func TestPeerManager_Dialed_UpgradeNoEvict(t *testing.T) {
|
||||
added, err = peerManager.Add(c)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err := peerManager.TryDialNext()
|
||||
dial := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c, dial)
|
||||
|
||||
@@ -1074,8 +1041,7 @@ func TestPeerManager_Accepted(t *testing.T) {
|
||||
added, err = peerManager.Add(c)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial := peerManager.TryDialNext()
|
||||
require.Equal(t, c, dial)
|
||||
require.NoError(t, peerManager.Accepted(c.NodeID))
|
||||
require.Error(t, peerManager.Dialed(c))
|
||||
@@ -1084,8 +1050,7 @@ func TestPeerManager_Accepted(t *testing.T) {
|
||||
added, err = peerManager.Add(d)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Equal(t, d, dial)
|
||||
require.NoError(t, peerManager.Dialed(d))
|
||||
require.Error(t, peerManager.Accepted(d.NodeID))
|
||||
@@ -1233,8 +1198,7 @@ func TestPeerManager_Accepted_UpgradeDialing(t *testing.T) {
|
||||
added, err = peerManager.Add(b)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial := peerManager.TryDialNext()
|
||||
require.Equal(t, b, dial)
|
||||
|
||||
// a has already been claimed as an upgrade of a, so accepting
|
||||
@@ -1394,8 +1358,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeDialed(t *testing.T) {
|
||||
added, err := peerManager.Add(b)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial := peerManager.TryDialNext()
|
||||
require.Equal(t, b, dial)
|
||||
require.NoError(t, peerManager.Dialed(b))
|
||||
}()
|
||||
@@ -1521,13 +1484,11 @@ func TestPeerManager_Disconnected(t *testing.T) {
|
||||
|
||||
// Disconnecting a dialing peer does not unmark it as dialing, to avoid
|
||||
// dialing it multiple times in parallel.
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial := peerManager.TryDialNext()
|
||||
require.Equal(t, a, dial)
|
||||
|
||||
peerManager.Disconnected(a.NodeID)
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Zero(t, dial)
|
||||
}
|
||||
|
||||
@@ -1595,8 +1556,7 @@ func TestPeerManager_Subscribe(t *testing.T) {
|
||||
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates())
|
||||
|
||||
// Outbound connection with peer error and eviction.
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial := peerManager.TryDialNext()
|
||||
require.Equal(t, a, dial)
|
||||
require.Empty(t, sub.Updates())
|
||||
|
||||
@@ -1619,8 +1579,7 @@ func TestPeerManager_Subscribe(t *testing.T) {
|
||||
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates())
|
||||
|
||||
// Outbound connection with dial failure.
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial = peerManager.TryDialNext()
|
||||
require.Equal(t, a, dial)
|
||||
require.Empty(t, sub.Updates())
|
||||
|
||||
@@ -1716,8 +1675,7 @@ func TestPeerManager_Close(t *testing.T) {
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
dial := peerManager.TryDialNext()
|
||||
require.Equal(t, a, dial)
|
||||
require.NoError(t, peerManager.DialFailed(a))
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"net"
|
||||
"runtime"
|
||||
"sync"
|
||||
@@ -160,12 +159,6 @@ type RouterOptions struct {
|
||||
// return an error to reject the peer.
|
||||
FilterPeerByID func(context.Context, types.NodeID) error
|
||||
|
||||
// DialSleep controls the amount of time that the router
|
||||
// sleeps between dialing peers. If not set, a default value
|
||||
// is used that sleeps for a (random) amount of time up to 3
|
||||
// seconds between submitting each peer to be dialed.
|
||||
DialSleep func(context.Context)
|
||||
|
||||
// NumConcrruentDials controls how many parallel go routines
|
||||
// are used to dial peers. This defaults to the value of
|
||||
// runtime.NumCPU.
|
||||
@@ -291,7 +284,7 @@ func NewRouter(
|
||||
|
||||
router := &Router{
|
||||
logger: logger,
|
||||
metrics: metrics,
|
||||
metrics: NopMetrics(),
|
||||
nodeInfo: nodeInfo,
|
||||
privKey: privKey,
|
||||
connTracker: newConnTracker(
|
||||
@@ -312,6 +305,10 @@ func NewRouter(
|
||||
|
||||
router.BaseService = service.NewBaseService(logger, "router", router)
|
||||
|
||||
if metrics != nil {
|
||||
router.metrics = metrics
|
||||
}
|
||||
|
||||
qf, err := router.createQueueFactory()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -422,11 +419,7 @@ func (r *Router) routeChannel(
|
||||
) {
|
||||
for {
|
||||
select {
|
||||
case envelope, ok := <-outCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
case envelope := <-outCh:
|
||||
// Mark the envelope with the channel ID to allow sendPeer() to pass
|
||||
// it on to Transport.SendMessage().
|
||||
envelope.channelID = chID
|
||||
@@ -503,20 +496,22 @@ func (r *Router) routeChannel(
|
||||
}
|
||||
}
|
||||
|
||||
case peerError, ok := <-errCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
shouldEvict := peerError.Fatal || r.peerManager.HasMaxPeerCapacity()
|
||||
case peerError := <-errCh:
|
||||
maxPeerCapacity := r.peerManager.HasMaxPeerCapacity()
|
||||
r.logger.Error("peer error",
|
||||
"peer", peerError.NodeID,
|
||||
"err", peerError.Err,
|
||||
"evicting", shouldEvict,
|
||||
"disconnecting", peerError.Fatal || maxPeerCapacity,
|
||||
)
|
||||
if shouldEvict {
|
||||
|
||||
if peerError.Fatal || maxPeerCapacity {
|
||||
// if the error is fatal or all peer
|
||||
// slots are in use, we can error
|
||||
// (disconnect) from the peer.
|
||||
r.peerManager.Errored(peerError.NodeID, peerError.Err)
|
||||
} else {
|
||||
// this just decrements the peer
|
||||
// score.
|
||||
r.peerManager.processPeerEvent(PeerUpdate{
|
||||
NodeID: peerError.NodeID,
|
||||
Status: PeerStatusBad,
|
||||
@@ -530,7 +525,7 @@ func (r *Router) routeChannel(
|
||||
|
||||
func (r *Router) numConcurrentDials() int {
|
||||
if r.options.NumConcurrentDials == nil {
|
||||
return runtime.NumCPU()
|
||||
return runtime.NumCPU() * 32
|
||||
}
|
||||
|
||||
return r.options.NumConcurrentDials()
|
||||
@@ -552,35 +547,6 @@ func (r *Router) filterPeersID(ctx context.Context, id types.NodeID) error {
|
||||
return r.options.FilterPeerByID(ctx, id)
|
||||
}
|
||||
|
||||
func (r *Router) dialSleep(ctx context.Context) {
|
||||
if r.options.DialSleep == nil {
|
||||
const (
|
||||
maxDialerInterval = 500
|
||||
minDialerInterval = 100
|
||||
)
|
||||
|
||||
// nolint:gosec // G404: Use of weak random number generator
|
||||
dur := time.Duration(rand.Int63n(maxDialerInterval-minDialerInterval+1) + minDialerInterval)
|
||||
|
||||
timer := time.NewTimer(dur * time.Millisecond)
|
||||
defer timer.Stop()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-timer.C:
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
r.options.DialSleep(ctx)
|
||||
|
||||
if !r.peerManager.HasDialedMaxPeers() {
|
||||
r.peerManager.dialWaker.Wake()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// acceptPeers accepts inbound connections from peers on the given transport,
|
||||
// and spawns goroutines that route messages to/from them.
|
||||
func (r *Router) acceptPeers(transport Transport) {
|
||||
@@ -702,19 +668,13 @@ LOOP:
|
||||
case errors.Is(err, context.Canceled):
|
||||
r.logger.Debug("stopping dial routine")
|
||||
break LOOP
|
||||
case err != nil:
|
||||
r.logger.Error("failed to find next peer to dial", "err", err)
|
||||
break LOOP
|
||||
case address == NodeAddress{}:
|
||||
continue LOOP
|
||||
}
|
||||
|
||||
select {
|
||||
case addresses <- address:
|
||||
// this jitters the frequency that we call
|
||||
// DialNext and prevents us from attempting to
|
||||
// create connections too quickly.
|
||||
|
||||
r.dialSleep(ctx)
|
||||
continue
|
||||
continue LOOP
|
||||
case <-ctx.Done():
|
||||
close(addresses)
|
||||
break LOOP
|
||||
@@ -753,6 +713,7 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) {
|
||||
|
||||
if err := r.runWithPeerMutex(func() error { return r.peerManager.Dialed(address) }); err != nil {
|
||||
r.logger.Error("failed to dial peer", "op", "outgoing/dialing", "peer", address.NodeID, "err", err)
|
||||
r.peerManager.dialWaker.Wake()
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
@@ -1040,6 +1001,8 @@ func (r *Router) evictPeers() {
|
||||
queue, ok := r.peerQueues[peerID]
|
||||
r.peerMtx.RUnlock()
|
||||
|
||||
r.metrics.PeersEvicted.Add(1)
|
||||
|
||||
if ok {
|
||||
queue.close()
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package p2p_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -133,13 +132,6 @@ func TestRouter_Channel_Basic(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, router.NodeInfo().Channels, chDesc2.ID)
|
||||
|
||||
// Closing the channel, then opening it again should be fine.
|
||||
channel.Close()
|
||||
time.Sleep(100 * time.Millisecond) // yes yes, but Close() is async...
|
||||
|
||||
channel, err = router.OpenChannel(chDesc, &p2ptest.Message{}, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
// We should be able to send on the channel, even though there are no peers.
|
||||
p2ptest.RequireSend(t, channel, p2p.Envelope{
|
||||
To: types.NodeID(strings.Repeat("a", 40)),
|
||||
@@ -671,7 +663,6 @@ func TestRouter_DialPeers_Parallel(t *testing.T) {
|
||||
peerManager,
|
||||
[]p2p.Transport{mockTransport},
|
||||
p2p.RouterOptions{
|
||||
DialSleep: func(_ context.Context) {},
|
||||
NumConcurrentDials: func() int {
|
||||
ncpu := runtime.NumCPU()
|
||||
if ncpu <= 3 {
|
||||
|
||||
@@ -504,17 +504,18 @@ func createPeerManager(
|
||||
const maxUpgradeConns = 4
|
||||
|
||||
options := p2p.PeerManagerOptions{
|
||||
SelfAddress: selfAddr,
|
||||
MaxConnected: maxConns,
|
||||
MaxOutgoingConnections: maxOutgoingConns,
|
||||
MaxConnectedUpgrade: maxUpgradeConns,
|
||||
MaxPeers: maxUpgradeConns + 4*maxConns,
|
||||
MinRetryTime: 250 * time.Millisecond,
|
||||
MaxRetryTime: 30 * time.Minute,
|
||||
MaxRetryTimePersistent: 5 * time.Minute,
|
||||
RetryTimeJitter: 5 * time.Second,
|
||||
PrivatePeers: privatePeerIDs,
|
||||
Metrics: metrics,
|
||||
SelfAddress: selfAddr,
|
||||
MaxConnected: maxConns,
|
||||
MaxOutgoingConnections: maxOutgoingConns,
|
||||
MaxConnectedUpgrade: maxUpgradeConns,
|
||||
DisconnectCooldownPeriod: 2 * time.Second,
|
||||
MaxPeers: maxUpgradeConns + 4*maxConns,
|
||||
MinRetryTime: 250 * time.Millisecond,
|
||||
MaxRetryTime: 30 * time.Minute,
|
||||
MaxRetryTimePersistent: 5 * time.Minute,
|
||||
RetryTimeJitter: 5 * time.Second,
|
||||
PrivatePeers: privatePeerIDs,
|
||||
Metrics: metrics,
|
||||
}
|
||||
|
||||
peers := []p2p.NodeAddress{}
|
||||
|
||||
@@ -22,7 +22,7 @@ func waitForHeight(ctx context.Context, testnet *e2e.Testnet, height int64) (*ty
|
||||
clients = map[string]*rpchttp.HTTP{}
|
||||
lastHeight int64
|
||||
lastIncrease = time.Now()
|
||||
nodesAtHeight = map[string]struct{}{}
|
||||
nodesAtHeight = map[string]int64{}
|
||||
numRunningNodes int
|
||||
)
|
||||
if height == 0 {
|
||||
@@ -86,7 +86,7 @@ func waitForHeight(ctx context.Context, testnet *e2e.Testnet, height int64) (*ty
|
||||
|
||||
// add this node to the set of target
|
||||
// height nodes
|
||||
nodesAtHeight[node.Name] = struct{}{}
|
||||
nodesAtHeight[node.Name] = result.SyncInfo.LatestBlockHeight
|
||||
|
||||
// if not all of the nodes that we
|
||||
// have clients for have reached the
|
||||
@@ -111,7 +111,7 @@ func waitForHeight(ctx context.Context, testnet *e2e.Testnet, height int64) (*ty
|
||||
if len(clients) == 0 {
|
||||
return nil, nil, errors.New("unable to connect to any network nodes")
|
||||
}
|
||||
if time.Since(lastIncrease) >= time.Minute {
|
||||
if time.Since(lastIncrease) >= 90*time.Second {
|
||||
if lastHeight == 0 {
|
||||
return nil, nil, errors.New("chain stalled at unknown height (most likely upon starting)")
|
||||
}
|
||||
|
||||
@@ -17,6 +17,8 @@ func TestNet_Peers(t *testing.T) {
|
||||
netInfo, err := client.NetInfo(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// FIXME: https://github.com/tendermint/tendermint/issues/8848
|
||||
// We should be able to assert that we can discover all peers in a network
|
||||
expectedPeers := len(node.Testnet.Nodes)
|
||||
peers := make(map[string]*e2e.Node, 0)
|
||||
seen := map[string]bool{}
|
||||
@@ -30,7 +32,7 @@ func TestNet_Peers(t *testing.T) {
|
||||
seen[n.Name] = false
|
||||
}
|
||||
|
||||
require.Equal(t, expectedPeers, netInfo.NPeers,
|
||||
require.GreaterOrEqual(t, netInfo.NPeers, expectedPeers-1,
|
||||
"node is not fully meshed with peers")
|
||||
|
||||
for _, peerInfo := range netInfo.Peers {
|
||||
@@ -42,8 +44,10 @@ func TestNet_Peers(t *testing.T) {
|
||||
seen[peer.Name] = true
|
||||
}
|
||||
|
||||
for name := range seen {
|
||||
require.True(t, seen[name], "node %v not peered with %v", node.Name, name)
|
||||
}
|
||||
// FIXME: https://github.com/tendermint/tendermint/issues/8848
|
||||
// We should be able to assert that we can discover all peers in a network
|
||||
// for name := range seen {
|
||||
// require.True(t, seen[name], "node %v not peered with %v", node.Name, name)
|
||||
// }
|
||||
})
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ const (
|
||||
|
||||
// TMVersionDefault is the used as the fallback version of Tendermint Core
|
||||
// when not using git describe. It is formatted with semantic versioning.
|
||||
TMVersionDefault = "0.35.6"
|
||||
TMVersionDefault = "0.35.7"
|
||||
|
||||
// ABCISemVer is the semantic version of the ABCI library
|
||||
ABCISemVer = "0.17.0"
|
||||
|
||||
Reference in New Issue
Block a user