mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-13 00:02:52 +00:00
Compare commits
2 Commits
wb/pqueue-
...
wb/experim
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7fa14accff | ||
|
|
af9bca5013 |
2
.github/workflows/janitor.yml
vendored
2
.github/workflows/janitor.yml
vendored
@@ -10,7 +10,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 3
|
||||
steps:
|
||||
- uses: styfle/cancel-workflow-action@0.10.0
|
||||
- uses: styfle/cancel-workflow-action@0.9.1
|
||||
with:
|
||||
workflow_id: 1041851,1401230,2837803
|
||||
access_token: ${{ github.token }}
|
||||
|
||||
@@ -22,8 +22,6 @@ Special thanks to external contributors on this release:
|
||||
|
||||
### FEATURES
|
||||
|
||||
- [cli] [\#8675] Add command to force compact goleveldb databases (@cmwaters)
|
||||
|
||||
### IMPROVEMENTS
|
||||
|
||||
### BUG FIXES
|
||||
|
||||
@@ -1,69 +0,0 @@
|
||||
package commands
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
"github.com/syndtr/goleveldb/leveldb/util"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
)
|
||||
|
||||
func MakeCompactDBCommand() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "experimental-compact-goleveldb",
|
||||
Short: "force compacts the tendermint storage engine (only GoLevelDB supported)",
|
||||
Long: `
|
||||
This is a temporary utility command that performs a force compaction on the state
|
||||
and blockstores to reduce disk space for a pruning node. This should only be run
|
||||
once the node has stopped. This command will likely be omitted in the future after
|
||||
the planned refactor to the storage engine.
|
||||
|
||||
Currently, only GoLevelDB is supported.
|
||||
`,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
if config.DBBackend != "goleveldb" {
|
||||
return errors.New("compaction is currently only supported with goleveldb")
|
||||
}
|
||||
|
||||
compactGoLevelDBs(config.RootDir, logger)
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func compactGoLevelDBs(rootDir string, logger log.Logger) {
|
||||
dbNames := []string{"state", "blockstore"}
|
||||
o := &opt.Options{
|
||||
DisableSeeksCompaction: true,
|
||||
}
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
for _, dbName := range dbNames {
|
||||
dbName := dbName
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
dbPath := filepath.Join(rootDir, "data", dbName+".db")
|
||||
store, err := leveldb.OpenFile(dbPath, o)
|
||||
if err != nil {
|
||||
logger.Error("failed to initialize tendermint db", "path", dbPath, "err", err)
|
||||
return
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
logger.Info("starting compaction...", "db", dbPath)
|
||||
|
||||
err = store.CompactRange(util.Range{Start: nil, Limit: nil})
|
||||
if err != nil {
|
||||
logger.Error("failed to compact tendermint db", "path", dbPath, "err", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
@@ -32,7 +32,6 @@ func main() {
|
||||
cmd.InspectCmd,
|
||||
cmd.RollbackStateCmd,
|
||||
cmd.MakeKeyMigrateCommand(),
|
||||
cmd.MakeCompactDBCommand(),
|
||||
debug.DebugCmd,
|
||||
cli.NewCompletionCmd(rootCmd, true),
|
||||
)
|
||||
|
||||
1
go.mod
1
go.mod
@@ -42,7 +42,6 @@ require (
|
||||
github.com/spf13/cobra v1.5.0
|
||||
github.com/spf13/viper v1.12.0
|
||||
github.com/stretchr/testify v1.7.2
|
||||
github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca
|
||||
github.com/tendermint/tm-db v0.6.6
|
||||
github.com/vektra/mockery/v2 v2.13.1
|
||||
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e
|
||||
|
||||
@@ -53,9 +53,6 @@ type Metrics struct {
|
||||
// this node.
|
||||
PeersConnectedIncoming metrics.Gauge
|
||||
|
||||
// Number of peers evicted by this node.
|
||||
PeersEvicted metrics.Counter
|
||||
|
||||
// RouterPeerQueueRecv defines the time taken to read off of a peer's queue
|
||||
// before sending on the connection.
|
||||
RouterPeerQueueRecv metrics.Histogram
|
||||
@@ -113,12 +110,6 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
|
||||
Name: "peers_connected_success",
|
||||
Help: "Number of successful peer connection attempts",
|
||||
}, labels).With(labelsAndValues...),
|
||||
PeersEvicted: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "peers_evicted",
|
||||
Help: "Number of connected peers evicted",
|
||||
}, labels).With(labelsAndValues...),
|
||||
PeersConnectedFailure: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
@@ -209,7 +200,6 @@ func NopMetrics() *Metrics {
|
||||
PeersConnectedIncoming: discard.NewGauge(),
|
||||
PeersConnectedOutgoing: discard.NewGauge(),
|
||||
PeersInactivated: discard.NewGauge(),
|
||||
PeersEvicted: discard.NewCounter(),
|
||||
PeerReceiveBytesTotal: discard.NewCounter(),
|
||||
PeerSendBytesTotal: discard.NewCounter(),
|
||||
PeerPendingSendBytes: discard.NewGauge(),
|
||||
|
||||
@@ -538,13 +538,12 @@ func (m *PeerManager) HasDialedMaxPeers() bool {
|
||||
// returned peer.
|
||||
func (m *PeerManager) DialNext(ctx context.Context) (NodeAddress, error) {
|
||||
for {
|
||||
if address := m.TryDialNext(); (address != NodeAddress{}) {
|
||||
return address, nil
|
||||
address, err := m.TryDialNext()
|
||||
if err != nil || (address != NodeAddress{}) {
|
||||
return address, err
|
||||
}
|
||||
|
||||
select {
|
||||
case <-m.dialWaker.Sleep():
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
return NodeAddress{}, ctx.Err()
|
||||
}
|
||||
@@ -553,20 +552,21 @@ func (m *PeerManager) DialNext(ctx context.Context) (NodeAddress, error) {
|
||||
|
||||
// TryDialNext is equivalent to DialNext(), but immediately returns an empty
|
||||
// address if no peers or connection slots are available.
|
||||
func (m *PeerManager) TryDialNext() NodeAddress {
|
||||
func (m *PeerManager) TryDialNext() (NodeAddress, error) {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
// We allow dialing MaxConnected+MaxConnectedUpgrade peers. Including
|
||||
// MaxConnectedUpgrade allows us to probe additional peers that have a
|
||||
// higher score than any other peers, and if successful evict it.
|
||||
if m.options.MaxConnected > 0 && len(m.connected)+len(m.dialing) >= int(m.options.MaxConnected)+int(m.options.MaxConnectedUpgrade) {
|
||||
return NodeAddress{}
|
||||
if m.options.MaxConnected > 0 && len(m.connected)+len(m.dialing) >=
|
||||
int(m.options.MaxConnected)+int(m.options.MaxConnectedUpgrade) {
|
||||
return NodeAddress{}, nil
|
||||
}
|
||||
|
||||
cinfo := m.getConnectedInfo()
|
||||
if m.options.MaxOutgoingConnections > 0 && cinfo.outgoing >= m.options.MaxOutgoingConnections {
|
||||
return NodeAddress{}
|
||||
return NodeAddress{}, nil
|
||||
}
|
||||
|
||||
for _, peer := range m.store.Ranked() {
|
||||
@@ -583,10 +583,6 @@ func (m *PeerManager) TryDialNext() NodeAddress {
|
||||
continue
|
||||
}
|
||||
|
||||
if id, ok := m.store.Resolve(addressInfo.Address); ok && (m.isConnected(id) || m.dialing[id]) {
|
||||
continue
|
||||
}
|
||||
|
||||
// We now have an eligible address to dial. If we're full but have
|
||||
// upgrade capacity (as checked above), we find a lower-scored peer
|
||||
// we can replace and mark it as upgrading so noone else claims it.
|
||||
@@ -597,16 +593,16 @@ func (m *PeerManager) TryDialNext() NodeAddress {
|
||||
if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) {
|
||||
upgradeFromPeer := m.findUpgradeCandidate(peer.ID, peer.Score())
|
||||
if upgradeFromPeer == "" {
|
||||
return NodeAddress{}
|
||||
return NodeAddress{}, nil
|
||||
}
|
||||
m.upgrading[upgradeFromPeer] = peer.ID
|
||||
}
|
||||
|
||||
m.dialing[peer.ID] = true
|
||||
return addressInfo.Address
|
||||
return addressInfo.Address, nil
|
||||
}
|
||||
}
|
||||
return NodeAddress{}
|
||||
return NodeAddress{}, nil
|
||||
}
|
||||
|
||||
// DialFailed reports a failed dial attempt. This will make the peer available
|
||||
@@ -714,7 +710,8 @@ func (m *PeerManager) Dialed(address NodeAddress) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if upgradeFromPeer != "" && m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) {
|
||||
if upgradeFromPeer != "" && m.options.MaxConnected > 0 &&
|
||||
len(m.connected) >= int(m.options.MaxConnected) {
|
||||
// Look for an even lower-scored peer that may have appeared since we
|
||||
// started the upgrade.
|
||||
if p, ok := m.store.Get(upgradeFromPeer); ok {
|
||||
@@ -723,11 +720,11 @@ func (m *PeerManager) Dialed(address NodeAddress) error {
|
||||
}
|
||||
}
|
||||
m.evict[upgradeFromPeer] = true
|
||||
m.evictWaker.Wake()
|
||||
}
|
||||
|
||||
m.metrics.PeersConnectedOutgoing.Add(1)
|
||||
m.connected[peer.ID] = peerConnectionOutgoing
|
||||
m.evictWaker.Wake()
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1290,7 +1287,6 @@ func (m *PeerManager) retryDelay(failures uint32, persistent bool) time.Duration
|
||||
type peerStore struct {
|
||||
db dbm.DB
|
||||
peers map[types.NodeID]*peerInfo
|
||||
index map[NodeAddress]types.NodeID
|
||||
ranked []*peerInfo // cache for Ranked(), nil invalidates cache
|
||||
}
|
||||
|
||||
@@ -1310,7 +1306,6 @@ func newPeerStore(db dbm.DB) (*peerStore, error) {
|
||||
// loadPeers loads all peers from the database into memory.
|
||||
func (s *peerStore) loadPeers() error {
|
||||
peers := map[types.NodeID]*peerInfo{}
|
||||
addrs := map[NodeAddress]types.NodeID{}
|
||||
|
||||
start, end := keyPeerInfoRange()
|
||||
iter, err := s.db.Iterator(start, end)
|
||||
@@ -1330,18 +1325,11 @@ func (s *peerStore) loadPeers() error {
|
||||
return fmt.Errorf("invalid peer data: %w", err)
|
||||
}
|
||||
peers[peer.ID] = peer
|
||||
for addr := range peer.AddressInfo {
|
||||
// TODO maybe check to see if we've seen this
|
||||
// addr before for a different peer, there
|
||||
// could be duplicates.
|
||||
addrs[addr] = peer.ID
|
||||
}
|
||||
}
|
||||
if iter.Error() != nil {
|
||||
return iter.Error()
|
||||
}
|
||||
s.peers = peers
|
||||
s.index = addrs
|
||||
s.ranked = nil // invalidate cache if populated
|
||||
return nil
|
||||
}
|
||||
@@ -1353,12 +1341,6 @@ func (s *peerStore) Get(id types.NodeID) (peerInfo, bool) {
|
||||
return peer.Copy(), ok
|
||||
}
|
||||
|
||||
// Resolve returns the peer ID for a given node address if known.
|
||||
func (s *peerStore) Resolve(addr NodeAddress) (types.NodeID, bool) {
|
||||
id, ok := s.index[addr]
|
||||
return id, ok
|
||||
}
|
||||
|
||||
// Set stores peer data. The input data will be copied, and can safely be reused
|
||||
// by the caller.
|
||||
func (s *peerStore) Set(peer peerInfo) error {
|
||||
@@ -1387,29 +1369,20 @@ func (s *peerStore) Set(peer peerInfo) error {
|
||||
// update the existing pointer address.
|
||||
*current = peer
|
||||
}
|
||||
for addr := range peer.AddressInfo {
|
||||
s.index[addr] = peer.ID
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete deletes a peer, or does nothing if it does not exist.
|
||||
func (s *peerStore) Delete(id types.NodeID) error {
|
||||
peer, ok := s.peers[id]
|
||||
if !ok {
|
||||
if _, ok := s.peers[id]; !ok {
|
||||
return nil
|
||||
}
|
||||
for _, addr := range peer.AddressInfo {
|
||||
delete(s.index, addr.Address)
|
||||
}
|
||||
delete(s.peers, id)
|
||||
s.ranked = nil
|
||||
|
||||
if err := s.db.Delete(keyPeerInfo(id)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
delete(s.peers, id)
|
||||
s.ranked = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -378,14 +378,16 @@ func TestPeerManager_DialNext_WakeOnDialFailed(t *testing.T) {
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, a, dial)
|
||||
|
||||
// Add b. We shouldn't be able to dial it, due to MaxConnected.
|
||||
added, err = peerManager.Add(b)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Zero(t, dial)
|
||||
|
||||
// Spawn a goroutine to fail a's dial attempt.
|
||||
@@ -413,7 +415,8 @@ func TestPeerManager_DialNext_WakeOnDialFailedRetry(t *testing.T) {
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, a, dial)
|
||||
require.NoError(t, peerManager.DialFailed(dial))
|
||||
failed := time.Now()
|
||||
@@ -440,7 +443,8 @@ func TestPeerManager_DialNext_WakeOnDisconnected(t *testing.T) {
|
||||
err = peerManager.Accepted(a.NodeID)
|
||||
require.NoError(t, err)
|
||||
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Zero(t, dial)
|
||||
|
||||
go func() {
|
||||
@@ -469,7 +473,8 @@ func TestPeerManager_TryDialNext_MaxConnected(t *testing.T) {
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, a, dial)
|
||||
require.NoError(t, peerManager.Dialed(a))
|
||||
|
||||
@@ -477,14 +482,16 @@ func TestPeerManager_TryDialNext_MaxConnected(t *testing.T) {
|
||||
added, err = peerManager.Add(b)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, b, dial)
|
||||
|
||||
// At this point, adding c will not allow dialing it.
|
||||
added, err = peerManager.Add(c)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Zero(t, dial)
|
||||
}
|
||||
|
||||
@@ -513,7 +520,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, a, dial)
|
||||
require.NoError(t, peerManager.Dialed(a))
|
||||
@@ -522,7 +529,8 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
|
||||
added, err = peerManager.Add(b)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, b, dial)
|
||||
|
||||
// Even though we are at capacity, we should be allowed to dial c for an
|
||||
@@ -530,7 +538,8 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
|
||||
added, err = peerManager.Add(c)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c, dial)
|
||||
|
||||
// However, since we're using all upgrade slots now, we can't add and dial
|
||||
@@ -538,14 +547,16 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
|
||||
added, err = peerManager.Add(d)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Zero(t, dial)
|
||||
|
||||
// We go through with c's upgrade.
|
||||
require.NoError(t, peerManager.Dialed(c))
|
||||
|
||||
// Still can't dial d.
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Zero(t, dial)
|
||||
|
||||
// Now, if we disconnect a, we should be allowed to dial d because we have a
|
||||
@@ -561,7 +572,8 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
|
||||
added, err = peerManager.Add(e)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Zero(t, dial)
|
||||
}
|
||||
|
||||
@@ -581,7 +593,8 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) {
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, a, dial)
|
||||
require.NoError(t, peerManager.Dialed(a))
|
||||
|
||||
@@ -589,7 +602,8 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) {
|
||||
added, err = peerManager.Add(b)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, b, dial)
|
||||
|
||||
// Adding c and dialing it will fail, because a is the only connected
|
||||
@@ -597,7 +611,8 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) {
|
||||
added, err = peerManager.Add(c)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, dial)
|
||||
}
|
||||
|
||||
@@ -618,19 +633,22 @@ func TestPeerManager_TryDialNext_DialingConnected(t *testing.T) {
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, a, dial)
|
||||
|
||||
// Adding a's TCP address will not dispense a, since it's already dialing.
|
||||
added, err = peerManager.Add(aTCP)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Zero(t, dial)
|
||||
|
||||
// Marking a as dialed will still not dispense it.
|
||||
require.NoError(t, peerManager.Dialed(a))
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Zero(t, dial)
|
||||
|
||||
// Adding b and accepting a connection from it will not dispense it either.
|
||||
@@ -638,7 +656,8 @@ func TestPeerManager_TryDialNext_DialingConnected(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
require.NoError(t, peerManager.Accepted(bID))
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Zero(t, dial)
|
||||
}
|
||||
|
||||
@@ -664,14 +683,16 @@ func TestPeerManager_TryDialNext_Multiple(t *testing.T) {
|
||||
// All addresses should be dispensed as long as dialing them has failed.
|
||||
dial := []p2p.NodeAddress{}
|
||||
for range addresses {
|
||||
address := peerManager.TryDialNext()
|
||||
address, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.NotZero(t, address)
|
||||
require.NoError(t, peerManager.DialFailed(address))
|
||||
dial = append(dial, address)
|
||||
}
|
||||
require.ElementsMatch(t, dial, addresses)
|
||||
|
||||
address := peerManager.TryDialNext()
|
||||
address, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Zero(t, address)
|
||||
}
|
||||
|
||||
@@ -693,14 +714,15 @@ func TestPeerManager_DialFailed(t *testing.T) {
|
||||
// Dialing and then calling DialFailed with a different address (same
|
||||
// NodeID) should unmark as dialing and allow us to dial the other address
|
||||
// again, but not register the failed address.
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, a, dial)
|
||||
require.NoError(t, peerManager.DialFailed(p2p.NodeAddress{
|
||||
Protocol: "tcp", NodeID: aID, Hostname: "localhost"}))
|
||||
require.Equal(t, []p2p.NodeAddress{a}, peerManager.Addresses(aID))
|
||||
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, a, dial)
|
||||
|
||||
// Calling DialFailed on same address twice should be fine.
|
||||
@@ -731,7 +753,8 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) {
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, a, dial)
|
||||
require.NoError(t, peerManager.Dialed(a))
|
||||
|
||||
@@ -739,7 +762,8 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) {
|
||||
added, err = peerManager.Add(b)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, b, dial)
|
||||
|
||||
// Adding c and dialing it will fail, even though it could upgrade a and we
|
||||
@@ -748,12 +772,14 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) {
|
||||
added, err = peerManager.Add(c)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, dial)
|
||||
|
||||
// Failing b's dial will now make c available for dialing.
|
||||
require.NoError(t, peerManager.DialFailed(b))
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c, dial)
|
||||
}
|
||||
|
||||
@@ -768,7 +794,8 @@ func TestPeerManager_Dialed_Connected(t *testing.T) {
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, a, dial)
|
||||
|
||||
require.NoError(t, peerManager.Dialed(a))
|
||||
@@ -778,7 +805,8 @@ func TestPeerManager_Dialed_Connected(t *testing.T) {
|
||||
added, err = peerManager.Add(b)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, b, dial)
|
||||
|
||||
require.NoError(t, peerManager.Accepted(b.NodeID))
|
||||
@@ -807,7 +835,8 @@ func TestPeerManager_Dialed_MaxConnected(t *testing.T) {
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, a, dial)
|
||||
|
||||
// Marking b as dialed in the meanwhile (even without TryDialNext)
|
||||
@@ -849,7 +878,8 @@ func TestPeerManager_Dialed_MaxConnectedUpgrade(t *testing.T) {
|
||||
added, err = peerManager.Add(c)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c, dial)
|
||||
require.NoError(t, peerManager.Dialed(c))
|
||||
|
||||
@@ -893,7 +923,8 @@ func TestPeerManager_Dialed_Upgrade(t *testing.T) {
|
||||
added, err = peerManager.Add(b)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, b, dial)
|
||||
require.NoError(t, peerManager.Dialed(b))
|
||||
|
||||
@@ -902,7 +933,8 @@ func TestPeerManager_Dialed_Upgrade(t *testing.T) {
|
||||
added, err = peerManager.Add(c)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, dial)
|
||||
|
||||
// a should now be evicted.
|
||||
@@ -945,7 +977,8 @@ func TestPeerManager_Dialed_UpgradeEvenLower(t *testing.T) {
|
||||
added, err = peerManager.Add(c)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c, dial)
|
||||
|
||||
// In the meanwhile, a disconnects and d connects. d is even lower-scored
|
||||
@@ -995,7 +1028,7 @@ func TestPeerManager_Dialed_UpgradeNoEvict(t *testing.T) {
|
||||
added, err = peerManager.Add(c)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c, dial)
|
||||
|
||||
@@ -1041,7 +1074,8 @@ func TestPeerManager_Accepted(t *testing.T) {
|
||||
added, err = peerManager.Add(c)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c, dial)
|
||||
require.NoError(t, peerManager.Accepted(c.NodeID))
|
||||
require.Error(t, peerManager.Dialed(c))
|
||||
@@ -1050,7 +1084,8 @@ func TestPeerManager_Accepted(t *testing.T) {
|
||||
added, err = peerManager.Add(d)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, d, dial)
|
||||
require.NoError(t, peerManager.Dialed(d))
|
||||
require.Error(t, peerManager.Accepted(d.NodeID))
|
||||
@@ -1198,7 +1233,8 @@ func TestPeerManager_Accepted_UpgradeDialing(t *testing.T) {
|
||||
added, err = peerManager.Add(b)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, b, dial)
|
||||
|
||||
// a has already been claimed as an upgrade of a, so accepting
|
||||
@@ -1358,7 +1394,8 @@ func TestPeerManager_EvictNext_WakeOnUpgradeDialed(t *testing.T) {
|
||||
added, err := peerManager.Add(b)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, b, dial)
|
||||
require.NoError(t, peerManager.Dialed(b))
|
||||
}()
|
||||
@@ -1484,11 +1521,13 @@ func TestPeerManager_Disconnected(t *testing.T) {
|
||||
|
||||
// Disconnecting a dialing peer does not unmark it as dialing, to avoid
|
||||
// dialing it multiple times in parallel.
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, a, dial)
|
||||
|
||||
peerManager.Disconnected(a.NodeID)
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Zero(t, dial)
|
||||
}
|
||||
|
||||
@@ -1556,7 +1595,8 @@ func TestPeerManager_Subscribe(t *testing.T) {
|
||||
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates())
|
||||
|
||||
// Outbound connection with peer error and eviction.
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, a, dial)
|
||||
require.Empty(t, sub.Updates())
|
||||
|
||||
@@ -1579,7 +1619,8 @@ func TestPeerManager_Subscribe(t *testing.T) {
|
||||
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates())
|
||||
|
||||
// Outbound connection with dial failure.
|
||||
dial = peerManager.TryDialNext()
|
||||
dial, err = peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, a, dial)
|
||||
require.Empty(t, sub.Updates())
|
||||
|
||||
@@ -1675,7 +1716,8 @@ func TestPeerManager_Close(t *testing.T) {
|
||||
added, err := peerManager.Add(a)
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
dial := peerManager.TryDialNext()
|
||||
dial, err := peerManager.TryDialNext()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, a, dial)
|
||||
require.NoError(t, peerManager.DialFailed(a))
|
||||
|
||||
|
||||
@@ -156,7 +156,6 @@ func (s *pqScheduler) start() {
|
||||
func (s *pqScheduler) process() {
|
||||
defer s.done.Close()
|
||||
|
||||
LOOP:
|
||||
for {
|
||||
select {
|
||||
case e := <-s.enqueueCh:
|
||||
@@ -248,24 +247,21 @@ LOOP:
|
||||
|
||||
for s.pq.Len() > 0 {
|
||||
pqEnv = heap.Pop(s.pq).(*pqEnvelope)
|
||||
s.size -= pqEnv.size
|
||||
|
||||
// deduct the Envelope size from all the relevant cumulative sizes
|
||||
for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
|
||||
s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size
|
||||
}
|
||||
|
||||
s.metrics.PeerSendBytesTotal.With(
|
||||
"chID", chIDStr,
|
||||
"peer_id", string(pqEnv.envelope.To),
|
||||
"message_type", s.metrics.ValueToMetricLabel(pqEnv.envelope.Message)).Add(float64(pqEnv.size))
|
||||
select {
|
||||
case s.dequeueCh <- pqEnv.envelope:
|
||||
s.size -= pqEnv.size
|
||||
|
||||
// deduct the Envelope size from all the relevant cumulative sizes
|
||||
for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
|
||||
s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size
|
||||
}
|
||||
|
||||
s.metrics.PeerSendBytesTotal.With(
|
||||
"chID", chIDStr,
|
||||
"peer_id", string(pqEnv.envelope.To),
|
||||
"message_type", s.metrics.ValueToMetricLabel(pqEnv.envelope.Message)).Add(float64(pqEnv.size))
|
||||
case <-s.closer.Done():
|
||||
return
|
||||
default:
|
||||
heap.Push(s.pq, pqEnv)
|
||||
continue LOOP
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -284,7 +284,7 @@ func NewRouter(
|
||||
|
||||
router := &Router{
|
||||
logger: logger,
|
||||
metrics: NopMetrics(),
|
||||
metrics: metrics,
|
||||
nodeInfo: nodeInfo,
|
||||
privKey: privKey,
|
||||
connTracker: newConnTracker(
|
||||
@@ -305,10 +305,6 @@ func NewRouter(
|
||||
|
||||
router.BaseService = service.NewBaseService(logger, "router", router)
|
||||
|
||||
if metrics != nil {
|
||||
router.metrics = metrics
|
||||
}
|
||||
|
||||
qf, err := router.createQueueFactory()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -419,7 +415,11 @@ func (r *Router) routeChannel(
|
||||
) {
|
||||
for {
|
||||
select {
|
||||
case envelope := <-outCh:
|
||||
case envelope, ok := <-outCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Mark the envelope with the channel ID to allow sendPeer() to pass
|
||||
// it on to Transport.SendMessage().
|
||||
envelope.channelID = chID
|
||||
@@ -496,22 +496,20 @@ func (r *Router) routeChannel(
|
||||
}
|
||||
}
|
||||
|
||||
case peerError := <-errCh:
|
||||
maxPeerCapacity := r.peerManager.HasMaxPeerCapacity()
|
||||
case peerError, ok := <-errCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
shouldEvict := peerError.Fatal || r.peerManager.HasMaxPeerCapacity()
|
||||
r.logger.Error("peer error",
|
||||
"peer", peerError.NodeID,
|
||||
"err", peerError.Err,
|
||||
"disconnecting", peerError.Fatal || maxPeerCapacity,
|
||||
"evicting", shouldEvict,
|
||||
)
|
||||
|
||||
if peerError.Fatal || maxPeerCapacity {
|
||||
// if the error is fatal or all peer
|
||||
// slots are in use, we can error
|
||||
// (disconnect) from the peer.
|
||||
if shouldEvict {
|
||||
r.peerManager.Errored(peerError.NodeID, peerError.Err)
|
||||
} else {
|
||||
// this just decrements the peer
|
||||
// score.
|
||||
r.peerManager.processPeerEvent(PeerUpdate{
|
||||
NodeID: peerError.NodeID,
|
||||
Status: PeerStatusBad,
|
||||
@@ -668,8 +666,9 @@ LOOP:
|
||||
case errors.Is(err, context.Canceled):
|
||||
r.logger.Debug("stopping dial routine")
|
||||
break LOOP
|
||||
case address == NodeAddress{}:
|
||||
continue LOOP
|
||||
case err != nil:
|
||||
r.logger.Error("failed to find next peer to dial", "err", err)
|
||||
break LOOP
|
||||
}
|
||||
|
||||
select {
|
||||
@@ -713,7 +712,6 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) {
|
||||
|
||||
if err := r.runWithPeerMutex(func() error { return r.peerManager.Dialed(address) }); err != nil {
|
||||
r.logger.Error("failed to dial peer", "op", "outgoing/dialing", "peer", address.NodeID, "err", err)
|
||||
r.peerManager.dialWaker.Wake()
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
@@ -995,8 +993,6 @@ func (r *Router) evictPeers() {
|
||||
queue, ok := r.peerQueues[peerID]
|
||||
r.peerMtx.RUnlock()
|
||||
|
||||
r.metrics.PeersEvicted.Add(1)
|
||||
|
||||
if ok {
|
||||
queue.close()
|
||||
}
|
||||
|
||||
@@ -132,6 +132,13 @@ func TestRouter_Channel_Basic(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, router.NodeInfo().Channels, chDesc2.ID)
|
||||
|
||||
// Closing the channel, then opening it again should be fine.
|
||||
channel.Close()
|
||||
time.Sleep(100 * time.Millisecond) // yes yes, but Close() is async...
|
||||
|
||||
channel, err = router.OpenChannel(chDesc, &p2ptest.Message{}, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
// We should be able to send on the channel, even though there are no peers.
|
||||
p2ptest.RequireSend(t, channel, p2p.Envelope{
|
||||
To: types.NodeID(strings.Repeat("a", 40)),
|
||||
|
||||
@@ -22,7 +22,7 @@ func waitForHeight(ctx context.Context, testnet *e2e.Testnet, height int64) (*ty
|
||||
clients = map[string]*rpchttp.HTTP{}
|
||||
lastHeight int64
|
||||
lastIncrease = time.Now()
|
||||
nodesAtHeight = map[string]int64{}
|
||||
nodesAtHeight = map[string]struct{}{}
|
||||
numRunningNodes int
|
||||
)
|
||||
if height == 0 {
|
||||
@@ -86,7 +86,7 @@ func waitForHeight(ctx context.Context, testnet *e2e.Testnet, height int64) (*ty
|
||||
|
||||
// add this node to the set of target
|
||||
// height nodes
|
||||
nodesAtHeight[node.Name] = result.SyncInfo.LatestBlockHeight
|
||||
nodesAtHeight[node.Name] = struct{}{}
|
||||
|
||||
// if not all of the nodes that we
|
||||
// have clients for have reached the
|
||||
@@ -111,7 +111,7 @@ func waitForHeight(ctx context.Context, testnet *e2e.Testnet, height int64) (*ty
|
||||
if len(clients) == 0 {
|
||||
return nil, nil, errors.New("unable to connect to any network nodes")
|
||||
}
|
||||
if time.Since(lastIncrease) >= 90*time.Second {
|
||||
if time.Since(lastIncrease) >= time.Minute {
|
||||
if lastHeight == 0 {
|
||||
return nil, nil, errors.New("chain stalled at unknown height (most likely upon starting)")
|
||||
}
|
||||
|
||||
@@ -17,8 +17,6 @@ func TestNet_Peers(t *testing.T) {
|
||||
netInfo, err := client.NetInfo(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// FIXME: https://github.com/tendermint/tendermint/issues/8848
|
||||
// We should be able to assert that we can discover all peers in a network
|
||||
expectedPeers := len(node.Testnet.Nodes)
|
||||
peers := make(map[string]*e2e.Node, 0)
|
||||
seen := map[string]bool{}
|
||||
@@ -32,7 +30,7 @@ func TestNet_Peers(t *testing.T) {
|
||||
seen[n.Name] = false
|
||||
}
|
||||
|
||||
require.GreaterOrEqual(t, netInfo.NPeers, expectedPeers-1,
|
||||
require.Equal(t, expectedPeers, netInfo.NPeers,
|
||||
"node is not fully meshed with peers")
|
||||
|
||||
for _, peerInfo := range netInfo.Peers {
|
||||
@@ -44,10 +42,8 @@ func TestNet_Peers(t *testing.T) {
|
||||
seen[peer.Name] = true
|
||||
}
|
||||
|
||||
// FIXME: https://github.com/tendermint/tendermint/issues/8848
|
||||
// We should be able to assert that we can discover all peers in a network
|
||||
// for name := range seen {
|
||||
// require.True(t, seen[name], "node %v not peered with %v", node.Name, name)
|
||||
// }
|
||||
for name := range seen {
|
||||
require.True(t, seen[name], "node %v not peered with %v", node.Name, name)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user