Compare commits

..

2 Commits

Author SHA1 Message Date
mergify[bot]
7fa14accff p2p: remove dial sleep and provide disconnect cooldown (backport #8839) (#8875)
(cherry picked from commit 52b6dc19ba)
2022-06-28 19:06:50 -04:00
William Banfield
af9bca5013 p2p: set empty timeouts to configed values. (manual backport of #8847) (#8869)
* regenerate mocks using newer style

* p2p: set empty timeouts to small values. (#8847)

These timeouts default to 'do not time out' if they are not set. This times up resources, potentially indefinitely. If node on the other side of the the handshake is up but unresponsive, the[ handshake call](edec79448a/internal/p2p/router.go (L720)) will _never_ return.

* fix light client select statement
2022-06-28 19:02:29 -04:00
13 changed files with 147 additions and 220 deletions

View File

@@ -10,7 +10,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 3
steps:
- uses: styfle/cancel-workflow-action@0.10.0
- uses: styfle/cancel-workflow-action@0.9.1
with:
workflow_id: 1041851,1401230,2837803
access_token: ${{ github.token }}

View File

@@ -22,8 +22,6 @@ Special thanks to external contributors on this release:
### FEATURES
- [cli] [\#8675] Add command to force compact goleveldb databases (@cmwaters)
### IMPROVEMENTS
### BUG FIXES

View File

@@ -1,69 +0,0 @@
package commands
import (
"errors"
"path/filepath"
"sync"
"github.com/spf13/cobra"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
"github.com/tendermint/tendermint/libs/log"
)
func MakeCompactDBCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "experimental-compact-goleveldb",
Short: "force compacts the tendermint storage engine (only GoLevelDB supported)",
Long: `
This is a temporary utility command that performs a force compaction on the state
and blockstores to reduce disk space for a pruning node. This should only be run
once the node has stopped. This command will likely be omitted in the future after
the planned refactor to the storage engine.
Currently, only GoLevelDB is supported.
`,
RunE: func(cmd *cobra.Command, args []string) error {
if config.DBBackend != "goleveldb" {
return errors.New("compaction is currently only supported with goleveldb")
}
compactGoLevelDBs(config.RootDir, logger)
return nil
},
}
return cmd
}
func compactGoLevelDBs(rootDir string, logger log.Logger) {
dbNames := []string{"state", "blockstore"}
o := &opt.Options{
DisableSeeksCompaction: true,
}
wg := sync.WaitGroup{}
for _, dbName := range dbNames {
dbName := dbName
wg.Add(1)
go func() {
defer wg.Done()
dbPath := filepath.Join(rootDir, "data", dbName+".db")
store, err := leveldb.OpenFile(dbPath, o)
if err != nil {
logger.Error("failed to initialize tendermint db", "path", dbPath, "err", err)
return
}
defer store.Close()
logger.Info("starting compaction...", "db", dbPath)
err = store.CompactRange(util.Range{Start: nil, Limit: nil})
if err != nil {
logger.Error("failed to compact tendermint db", "path", dbPath, "err", err)
}
}()
}
wg.Wait()
}

View File

@@ -32,7 +32,6 @@ func main() {
cmd.InspectCmd,
cmd.RollbackStateCmd,
cmd.MakeKeyMigrateCommand(),
cmd.MakeCompactDBCommand(),
debug.DebugCmd,
cli.NewCompletionCmd(rootCmd, true),
)

1
go.mod
View File

@@ -42,7 +42,6 @@ require (
github.com/spf13/cobra v1.5.0
github.com/spf13/viper v1.12.0
github.com/stretchr/testify v1.7.2
github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca
github.com/tendermint/tm-db v0.6.6
github.com/vektra/mockery/v2 v2.13.1
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e

View File

@@ -53,9 +53,6 @@ type Metrics struct {
// this node.
PeersConnectedIncoming metrics.Gauge
// Number of peers evicted by this node.
PeersEvicted metrics.Counter
// RouterPeerQueueRecv defines the time taken to read off of a peer's queue
// before sending on the connection.
RouterPeerQueueRecv metrics.Histogram
@@ -113,12 +110,6 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Name: "peers_connected_success",
Help: "Number of successful peer connection attempts",
}, labels).With(labelsAndValues...),
PeersEvicted: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "peers_evicted",
Help: "Number of connected peers evicted",
}, labels).With(labelsAndValues...),
PeersConnectedFailure: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
@@ -209,7 +200,6 @@ func NopMetrics() *Metrics {
PeersConnectedIncoming: discard.NewGauge(),
PeersConnectedOutgoing: discard.NewGauge(),
PeersInactivated: discard.NewGauge(),
PeersEvicted: discard.NewCounter(),
PeerReceiveBytesTotal: discard.NewCounter(),
PeerSendBytesTotal: discard.NewCounter(),
PeerPendingSendBytes: discard.NewGauge(),

View File

@@ -538,13 +538,12 @@ func (m *PeerManager) HasDialedMaxPeers() bool {
// returned peer.
func (m *PeerManager) DialNext(ctx context.Context) (NodeAddress, error) {
for {
if address := m.TryDialNext(); (address != NodeAddress{}) {
return address, nil
address, err := m.TryDialNext()
if err != nil || (address != NodeAddress{}) {
return address, err
}
select {
case <-m.dialWaker.Sleep():
continue
case <-ctx.Done():
return NodeAddress{}, ctx.Err()
}
@@ -553,20 +552,21 @@ func (m *PeerManager) DialNext(ctx context.Context) (NodeAddress, error) {
// TryDialNext is equivalent to DialNext(), but immediately returns an empty
// address if no peers or connection slots are available.
func (m *PeerManager) TryDialNext() NodeAddress {
func (m *PeerManager) TryDialNext() (NodeAddress, error) {
m.mtx.Lock()
defer m.mtx.Unlock()
// We allow dialing MaxConnected+MaxConnectedUpgrade peers. Including
// MaxConnectedUpgrade allows us to probe additional peers that have a
// higher score than any other peers, and if successful evict it.
if m.options.MaxConnected > 0 && len(m.connected)+len(m.dialing) >= int(m.options.MaxConnected)+int(m.options.MaxConnectedUpgrade) {
return NodeAddress{}
if m.options.MaxConnected > 0 && len(m.connected)+len(m.dialing) >=
int(m.options.MaxConnected)+int(m.options.MaxConnectedUpgrade) {
return NodeAddress{}, nil
}
cinfo := m.getConnectedInfo()
if m.options.MaxOutgoingConnections > 0 && cinfo.outgoing >= m.options.MaxOutgoingConnections {
return NodeAddress{}
return NodeAddress{}, nil
}
for _, peer := range m.store.Ranked() {
@@ -583,10 +583,6 @@ func (m *PeerManager) TryDialNext() NodeAddress {
continue
}
if id, ok := m.store.Resolve(addressInfo.Address); ok && (m.isConnected(id) || m.dialing[id]) {
continue
}
// We now have an eligible address to dial. If we're full but have
// upgrade capacity (as checked above), we find a lower-scored peer
// we can replace and mark it as upgrading so noone else claims it.
@@ -597,16 +593,16 @@ func (m *PeerManager) TryDialNext() NodeAddress {
if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) {
upgradeFromPeer := m.findUpgradeCandidate(peer.ID, peer.Score())
if upgradeFromPeer == "" {
return NodeAddress{}
return NodeAddress{}, nil
}
m.upgrading[upgradeFromPeer] = peer.ID
}
m.dialing[peer.ID] = true
return addressInfo.Address
return addressInfo.Address, nil
}
}
return NodeAddress{}
return NodeAddress{}, nil
}
// DialFailed reports a failed dial attempt. This will make the peer available
@@ -714,7 +710,8 @@ func (m *PeerManager) Dialed(address NodeAddress) error {
return err
}
if upgradeFromPeer != "" && m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) {
if upgradeFromPeer != "" && m.options.MaxConnected > 0 &&
len(m.connected) >= int(m.options.MaxConnected) {
// Look for an even lower-scored peer that may have appeared since we
// started the upgrade.
if p, ok := m.store.Get(upgradeFromPeer); ok {
@@ -723,11 +720,11 @@ func (m *PeerManager) Dialed(address NodeAddress) error {
}
}
m.evict[upgradeFromPeer] = true
m.evictWaker.Wake()
}
m.metrics.PeersConnectedOutgoing.Add(1)
m.connected[peer.ID] = peerConnectionOutgoing
m.evictWaker.Wake()
return nil
}
@@ -1290,7 +1287,6 @@ func (m *PeerManager) retryDelay(failures uint32, persistent bool) time.Duration
type peerStore struct {
db dbm.DB
peers map[types.NodeID]*peerInfo
index map[NodeAddress]types.NodeID
ranked []*peerInfo // cache for Ranked(), nil invalidates cache
}
@@ -1310,7 +1306,6 @@ func newPeerStore(db dbm.DB) (*peerStore, error) {
// loadPeers loads all peers from the database into memory.
func (s *peerStore) loadPeers() error {
peers := map[types.NodeID]*peerInfo{}
addrs := map[NodeAddress]types.NodeID{}
start, end := keyPeerInfoRange()
iter, err := s.db.Iterator(start, end)
@@ -1330,18 +1325,11 @@ func (s *peerStore) loadPeers() error {
return fmt.Errorf("invalid peer data: %w", err)
}
peers[peer.ID] = peer
for addr := range peer.AddressInfo {
// TODO maybe check to see if we've seen this
// addr before for a different peer, there
// could be duplicates.
addrs[addr] = peer.ID
}
}
if iter.Error() != nil {
return iter.Error()
}
s.peers = peers
s.index = addrs
s.ranked = nil // invalidate cache if populated
return nil
}
@@ -1353,12 +1341,6 @@ func (s *peerStore) Get(id types.NodeID) (peerInfo, bool) {
return peer.Copy(), ok
}
// Resolve returns the peer ID for a given node address if known.
func (s *peerStore) Resolve(addr NodeAddress) (types.NodeID, bool) {
id, ok := s.index[addr]
return id, ok
}
// Set stores peer data. The input data will be copied, and can safely be reused
// by the caller.
func (s *peerStore) Set(peer peerInfo) error {
@@ -1387,29 +1369,20 @@ func (s *peerStore) Set(peer peerInfo) error {
// update the existing pointer address.
*current = peer
}
for addr := range peer.AddressInfo {
s.index[addr] = peer.ID
}
return nil
}
// Delete deletes a peer, or does nothing if it does not exist.
func (s *peerStore) Delete(id types.NodeID) error {
peer, ok := s.peers[id]
if !ok {
if _, ok := s.peers[id]; !ok {
return nil
}
for _, addr := range peer.AddressInfo {
delete(s.index, addr.Address)
}
delete(s.peers, id)
s.ranked = nil
if err := s.db.Delete(keyPeerInfo(id)); err != nil {
return err
}
delete(s.peers, id)
s.ranked = nil
return nil
}

View File

@@ -378,14 +378,16 @@ func TestPeerManager_DialNext_WakeOnDialFailed(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
// Add b. We shouldn't be able to dial it, due to MaxConnected.
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
// Spawn a goroutine to fail a's dial attempt.
@@ -413,7 +415,8 @@ func TestPeerManager_DialNext_WakeOnDialFailedRetry(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.DialFailed(dial))
failed := time.Now()
@@ -440,7 +443,8 @@ func TestPeerManager_DialNext_WakeOnDisconnected(t *testing.T) {
err = peerManager.Accepted(a.NodeID)
require.NoError(t, err)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
go func() {
@@ -469,7 +473,8 @@ func TestPeerManager_TryDialNext_MaxConnected(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.Dialed(a))
@@ -477,14 +482,16 @@ func TestPeerManager_TryDialNext_MaxConnected(t *testing.T) {
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
// At this point, adding c will not allow dialing it.
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
}
@@ -513,7 +520,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.Dialed(a))
@@ -522,7 +529,8 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
// Even though we are at capacity, we should be allowed to dial c for an
@@ -530,7 +538,8 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, c, dial)
// However, since we're using all upgrade slots now, we can't add and dial
@@ -538,14 +547,16 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
added, err = peerManager.Add(d)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
// We go through with c's upgrade.
require.NoError(t, peerManager.Dialed(c))
// Still can't dial d.
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
// Now, if we disconnect a, we should be allowed to dial d because we have a
@@ -561,7 +572,8 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
added, err = peerManager.Add(e)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
}
@@ -581,7 +593,8 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.Dialed(a))
@@ -589,7 +602,8 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) {
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
// Adding c and dialing it will fail, because a is the only connected
@@ -597,7 +611,8 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) {
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Empty(t, dial)
}
@@ -618,19 +633,22 @@ func TestPeerManager_TryDialNext_DialingConnected(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
// Adding a's TCP address will not dispense a, since it's already dialing.
added, err = peerManager.Add(aTCP)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
// Marking a as dialed will still not dispense it.
require.NoError(t, peerManager.Dialed(a))
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
// Adding b and accepting a connection from it will not dispense it either.
@@ -638,7 +656,8 @@ func TestPeerManager_TryDialNext_DialingConnected(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(bID))
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
}
@@ -664,14 +683,16 @@ func TestPeerManager_TryDialNext_Multiple(t *testing.T) {
// All addresses should be dispensed as long as dialing them has failed.
dial := []p2p.NodeAddress{}
for range addresses {
address := peerManager.TryDialNext()
address, err := peerManager.TryDialNext()
require.NoError(t, err)
require.NotZero(t, address)
require.NoError(t, peerManager.DialFailed(address))
dial = append(dial, address)
}
require.ElementsMatch(t, dial, addresses)
address := peerManager.TryDialNext()
address, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, address)
}
@@ -693,14 +714,15 @@ func TestPeerManager_DialFailed(t *testing.T) {
// Dialing and then calling DialFailed with a different address (same
// NodeID) should unmark as dialing and allow us to dial the other address
// again, but not register the failed address.
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.DialFailed(p2p.NodeAddress{
Protocol: "tcp", NodeID: aID, Hostname: "localhost"}))
require.Equal(t, []p2p.NodeAddress{a}, peerManager.Addresses(aID))
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
// Calling DialFailed on same address twice should be fine.
@@ -731,7 +753,8 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.Dialed(a))
@@ -739,7 +762,8 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) {
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
// Adding c and dialing it will fail, even though it could upgrade a and we
@@ -748,12 +772,14 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) {
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Empty(t, dial)
// Failing b's dial will now make c available for dialing.
require.NoError(t, peerManager.DialFailed(b))
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, c, dial)
}
@@ -768,7 +794,8 @@ func TestPeerManager_Dialed_Connected(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.Dialed(a))
@@ -778,7 +805,8 @@ func TestPeerManager_Dialed_Connected(t *testing.T) {
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
require.NoError(t, peerManager.Accepted(b.NodeID))
@@ -807,7 +835,8 @@ func TestPeerManager_Dialed_MaxConnected(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
// Marking b as dialed in the meanwhile (even without TryDialNext)
@@ -849,7 +878,8 @@ func TestPeerManager_Dialed_MaxConnectedUpgrade(t *testing.T) {
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, c, dial)
require.NoError(t, peerManager.Dialed(c))
@@ -893,7 +923,8 @@ func TestPeerManager_Dialed_Upgrade(t *testing.T) {
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
require.NoError(t, peerManager.Dialed(b))
@@ -902,7 +933,8 @@ func TestPeerManager_Dialed_Upgrade(t *testing.T) {
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Empty(t, dial)
// a should now be evicted.
@@ -945,7 +977,8 @@ func TestPeerManager_Dialed_UpgradeEvenLower(t *testing.T) {
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, c, dial)
// In the meanwhile, a disconnects and d connects. d is even lower-scored
@@ -995,7 +1028,7 @@ func TestPeerManager_Dialed_UpgradeNoEvict(t *testing.T) {
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, c, dial)
@@ -1041,7 +1074,8 @@ func TestPeerManager_Accepted(t *testing.T) {
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, c, dial)
require.NoError(t, peerManager.Accepted(c.NodeID))
require.Error(t, peerManager.Dialed(c))
@@ -1050,7 +1084,8 @@ func TestPeerManager_Accepted(t *testing.T) {
added, err = peerManager.Add(d)
require.NoError(t, err)
require.True(t, added)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, d, dial)
require.NoError(t, peerManager.Dialed(d))
require.Error(t, peerManager.Accepted(d.NodeID))
@@ -1198,7 +1233,8 @@ func TestPeerManager_Accepted_UpgradeDialing(t *testing.T) {
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
// a has already been claimed as an upgrade of a, so accepting
@@ -1358,7 +1394,8 @@ func TestPeerManager_EvictNext_WakeOnUpgradeDialed(t *testing.T) {
added, err := peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)
require.NoError(t, peerManager.Dialed(b))
}()
@@ -1484,11 +1521,13 @@ func TestPeerManager_Disconnected(t *testing.T) {
// Disconnecting a dialing peer does not unmark it as dialing, to avoid
// dialing it multiple times in parallel.
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
peerManager.Disconnected(a.NodeID)
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
}
@@ -1556,7 +1595,8 @@ func TestPeerManager_Subscribe(t *testing.T) {
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates())
// Outbound connection with peer error and eviction.
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.Empty(t, sub.Updates())
@@ -1579,7 +1619,8 @@ func TestPeerManager_Subscribe(t *testing.T) {
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates())
// Outbound connection with dial failure.
dial = peerManager.TryDialNext()
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.Empty(t, sub.Updates())
@@ -1675,7 +1716,8 @@ func TestPeerManager_Close(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
dial := peerManager.TryDialNext()
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.DialFailed(a))

View File

@@ -156,7 +156,6 @@ func (s *pqScheduler) start() {
func (s *pqScheduler) process() {
defer s.done.Close()
LOOP:
for {
select {
case e := <-s.enqueueCh:
@@ -248,24 +247,21 @@ LOOP:
for s.pq.Len() > 0 {
pqEnv = heap.Pop(s.pq).(*pqEnvelope)
s.size -= pqEnv.size
// deduct the Envelope size from all the relevant cumulative sizes
for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size
}
s.metrics.PeerSendBytesTotal.With(
"chID", chIDStr,
"peer_id", string(pqEnv.envelope.To),
"message_type", s.metrics.ValueToMetricLabel(pqEnv.envelope.Message)).Add(float64(pqEnv.size))
select {
case s.dequeueCh <- pqEnv.envelope:
s.size -= pqEnv.size
// deduct the Envelope size from all the relevant cumulative sizes
for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size
}
s.metrics.PeerSendBytesTotal.With(
"chID", chIDStr,
"peer_id", string(pqEnv.envelope.To),
"message_type", s.metrics.ValueToMetricLabel(pqEnv.envelope.Message)).Add(float64(pqEnv.size))
case <-s.closer.Done():
return
default:
heap.Push(s.pq, pqEnv)
continue LOOP
}
}

View File

@@ -284,7 +284,7 @@ func NewRouter(
router := &Router{
logger: logger,
metrics: NopMetrics(),
metrics: metrics,
nodeInfo: nodeInfo,
privKey: privKey,
connTracker: newConnTracker(
@@ -305,10 +305,6 @@ func NewRouter(
router.BaseService = service.NewBaseService(logger, "router", router)
if metrics != nil {
router.metrics = metrics
}
qf, err := router.createQueueFactory()
if err != nil {
return nil, err
@@ -419,7 +415,11 @@ func (r *Router) routeChannel(
) {
for {
select {
case envelope := <-outCh:
case envelope, ok := <-outCh:
if !ok {
return
}
// Mark the envelope with the channel ID to allow sendPeer() to pass
// it on to Transport.SendMessage().
envelope.channelID = chID
@@ -496,22 +496,20 @@ func (r *Router) routeChannel(
}
}
case peerError := <-errCh:
maxPeerCapacity := r.peerManager.HasMaxPeerCapacity()
case peerError, ok := <-errCh:
if !ok {
return
}
shouldEvict := peerError.Fatal || r.peerManager.HasMaxPeerCapacity()
r.logger.Error("peer error",
"peer", peerError.NodeID,
"err", peerError.Err,
"disconnecting", peerError.Fatal || maxPeerCapacity,
"evicting", shouldEvict,
)
if peerError.Fatal || maxPeerCapacity {
// if the error is fatal or all peer
// slots are in use, we can error
// (disconnect) from the peer.
if shouldEvict {
r.peerManager.Errored(peerError.NodeID, peerError.Err)
} else {
// this just decrements the peer
// score.
r.peerManager.processPeerEvent(PeerUpdate{
NodeID: peerError.NodeID,
Status: PeerStatusBad,
@@ -668,8 +666,9 @@ LOOP:
case errors.Is(err, context.Canceled):
r.logger.Debug("stopping dial routine")
break LOOP
case address == NodeAddress{}:
continue LOOP
case err != nil:
r.logger.Error("failed to find next peer to dial", "err", err)
break LOOP
}
select {
@@ -713,7 +712,6 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) {
if err := r.runWithPeerMutex(func() error { return r.peerManager.Dialed(address) }); err != nil {
r.logger.Error("failed to dial peer", "op", "outgoing/dialing", "peer", address.NodeID, "err", err)
r.peerManager.dialWaker.Wake()
conn.Close()
return
}
@@ -995,8 +993,6 @@ func (r *Router) evictPeers() {
queue, ok := r.peerQueues[peerID]
r.peerMtx.RUnlock()
r.metrics.PeersEvicted.Add(1)
if ok {
queue.close()
}

View File

@@ -132,6 +132,13 @@ func TestRouter_Channel_Basic(t *testing.T) {
require.NoError(t, err)
require.Contains(t, router.NodeInfo().Channels, chDesc2.ID)
// Closing the channel, then opening it again should be fine.
channel.Close()
time.Sleep(100 * time.Millisecond) // yes yes, but Close() is async...
channel, err = router.OpenChannel(chDesc, &p2ptest.Message{}, 0)
require.NoError(t, err)
// We should be able to send on the channel, even though there are no peers.
p2ptest.RequireSend(t, channel, p2p.Envelope{
To: types.NodeID(strings.Repeat("a", 40)),

View File

@@ -22,7 +22,7 @@ func waitForHeight(ctx context.Context, testnet *e2e.Testnet, height int64) (*ty
clients = map[string]*rpchttp.HTTP{}
lastHeight int64
lastIncrease = time.Now()
nodesAtHeight = map[string]int64{}
nodesAtHeight = map[string]struct{}{}
numRunningNodes int
)
if height == 0 {
@@ -86,7 +86,7 @@ func waitForHeight(ctx context.Context, testnet *e2e.Testnet, height int64) (*ty
// add this node to the set of target
// height nodes
nodesAtHeight[node.Name] = result.SyncInfo.LatestBlockHeight
nodesAtHeight[node.Name] = struct{}{}
// if not all of the nodes that we
// have clients for have reached the
@@ -111,7 +111,7 @@ func waitForHeight(ctx context.Context, testnet *e2e.Testnet, height int64) (*ty
if len(clients) == 0 {
return nil, nil, errors.New("unable to connect to any network nodes")
}
if time.Since(lastIncrease) >= 90*time.Second {
if time.Since(lastIncrease) >= time.Minute {
if lastHeight == 0 {
return nil, nil, errors.New("chain stalled at unknown height (most likely upon starting)")
}

View File

@@ -17,8 +17,6 @@ func TestNet_Peers(t *testing.T) {
netInfo, err := client.NetInfo(ctx)
require.NoError(t, err)
// FIXME: https://github.com/tendermint/tendermint/issues/8848
// We should be able to assert that we can discover all peers in a network
expectedPeers := len(node.Testnet.Nodes)
peers := make(map[string]*e2e.Node, 0)
seen := map[string]bool{}
@@ -32,7 +30,7 @@ func TestNet_Peers(t *testing.T) {
seen[n.Name] = false
}
require.GreaterOrEqual(t, netInfo.NPeers, expectedPeers-1,
require.Equal(t, expectedPeers, netInfo.NPeers,
"node is not fully meshed with peers")
for _, peerInfo := range netInfo.Peers {
@@ -44,10 +42,8 @@ func TestNet_Peers(t *testing.T) {
seen[peer.Name] = true
}
// FIXME: https://github.com/tendermint/tendermint/issues/8848
// We should be able to assert that we can discover all peers in a network
// for name := range seen {
// require.True(t, seen[name], "node %v not peered with %v", node.Name, name)
// }
for name := range seen {
require.True(t, seen[name], "node %v not peered with %v", node.Name, name)
}
})
}