diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index 9ecb104a7..ffe5f9970 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -38,6 +38,13 @@ const ( PeerStatusBad PeerStatus = "bad" // peer observed as bad ) +type peerConnectionDirection int + +const ( + peerConnectionIncoming peerConnectionDirection = iota + peerConnectionOutgoing +) + // PeerScore is a numeric score assigned to a peer (higher is better). type PeerScore int16 @@ -119,6 +126,11 @@ type PeerManagerOptions struct { // outbound). 0 means no limit. MaxConnected uint16 + // MaxOutgoingConnections specifies how many outgoing + // connections. It must be lower than MaxConnected. If it is + // 0, then all connections can be outgoing. + MaxOutgoingConnections uint16 + // MaxConnectedUpgrade is the maximum number of additional connections to // use for probing any better-scored peers to upgrade to when all connection // slots are full. 0 disables peer upgrading. @@ -220,6 +232,10 @@ func (o *PeerManagerOptions) Validate() error { } } + if o.MaxOutgoingConnections > 0 && o.MaxConnected < o.MaxOutgoingConnections { + return errors.New("cannot set MaxOutgoingConnections to a value larger than MaxConnected") + } + return nil } @@ -297,13 +313,13 @@ type PeerManager struct { mtx sync.Mutex store *peerStore - subscriptions map[*PeerUpdates]*PeerUpdates // keyed by struct identity (address) - dialing map[types.NodeID]bool // peers being dialed (DialNext → Dialed/DialFail) - upgrading map[types.NodeID]types.NodeID // peers claimed for upgrade (DialNext → Dialed/DialFail) - connected map[types.NodeID]bool // connected peers (Dialed/Accepted → Disconnected) - ready map[types.NodeID]bool // ready peers (Ready → Disconnected) - evict map[types.NodeID]bool // peers scheduled for eviction (Connected → EvictNext) - evicting map[types.NodeID]bool // peers being evicted (EvictNext → Disconnected) + subscriptions map[*PeerUpdates]*PeerUpdates // keyed by struct identity (address) + dialing map[types.NodeID]bool // peers being dialed (DialNext → Dialed/DialFail) + upgrading map[types.NodeID]types.NodeID // peers claimed for upgrade (DialNext → Dialed/DialFail) + connected map[types.NodeID]peerConnectionDirection // connected peers (Dialed/Accepted → Disconnected) + ready map[types.NodeID]bool // ready peers (Ready → Disconnected) + evict map[types.NodeID]bool // peers scheduled for eviction (Connected → EvictNext) + evicting map[types.NodeID]bool // peers being evicted (EvictNext → Disconnected) } // NewPeerManager creates a new peer manager. @@ -333,7 +349,7 @@ func NewPeerManager(selfID types.NodeID, peerDB dbm.DB, options PeerManagerOptio store: store, dialing: map[types.NodeID]bool{}, upgrading: map[types.NodeID]types.NodeID{}, - connected: map[types.NodeID]bool{}, + connected: map[types.NodeID]peerConnectionDirection{}, ready: map[types.NodeID]bool{}, evict: map[types.NodeID]bool{}, evicting: map[types.NodeID]bool{}, @@ -404,11 +420,12 @@ func (m *PeerManager) prunePeers() error { ranked := m.store.Ranked() for i := len(ranked) - 1; i >= 0; i-- { peerID := ranked[i].ID + switch { case m.store.Size() <= int(m.options.MaxPeers): return nil case m.dialing[peerID]: - case m.connected[peerID]: + case m.isConnected(peerID): default: if err := m.store.Delete(peerID); err != nil { return err @@ -419,6 +436,29 @@ func (m *PeerManager) prunePeers() error { return nil } +func (m *PeerManager) isConnected(peerID types.NodeID) bool { + _, ok := m.connected[peerID] + return ok +} + +type connectedInfo struct { + incoming uint16 + outgoing uint16 +} + +func (m *PeerManager) getConnectedInfo() connectedInfo { + out := connectedInfo{} + for _, direction := range m.connected { + switch direction { + case peerConnectionIncoming: + out.incoming++ + case peerConnectionOutgoing: + out.outgoing++ + } + } + return out +} + // Add adds a peer to the manager, given as an address. If the peer already // exists, the address is added to it if it isn't already present. This will push // low scoring peers out of the address book if it exceeds the maximum size. @@ -511,8 +551,13 @@ func (m *PeerManager) TryDialNext() (NodeAddress, error) { return NodeAddress{}, nil } + cinfo := m.getConnectedInfo() + if m.options.MaxOutgoingConnections > 0 && cinfo.outgoing >= m.options.MaxOutgoingConnections { + return NodeAddress{}, nil + } + for _, peer := range m.store.Ranked() { - if m.dialing[peer.ID] || m.connected[peer.ID] { + if m.dialing[peer.ID] || m.isConnected(peer.ID) { continue } @@ -627,7 +672,7 @@ func (m *PeerManager) Dialed(address NodeAddress) error { if address.NodeID == m.selfID { return fmt.Errorf("rejecting connection to self (%v)", address.NodeID) } - if m.connected[address.NodeID] { + if m.isConnected(address.NodeID) { return fmt.Errorf("peer %v is already connected", address.NodeID) } if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) { @@ -668,7 +713,7 @@ func (m *PeerManager) Dialed(address NodeAddress) error { } m.evict[upgradeFromPeer] = true } - m.connected[peer.ID] = true + m.connected[peer.ID] = peerConnectionOutgoing m.evictWaker.Wake() return nil @@ -698,7 +743,7 @@ func (m *PeerManager) Accepted(peerID types.NodeID) error { if peerID == m.selfID { return fmt.Errorf("rejecting connection from self (%v)", peerID) } - if m.connected[peerID] { + if m.isConnected(peerID) { return fmt.Errorf("peer %q is already connected", peerID) } if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected)+int(m.options.MaxConnectedUpgrade) { @@ -735,7 +780,7 @@ func (m *PeerManager) Accepted(peerID types.NodeID) error { return err } - m.connected[peerID] = true + m.connected[peerID] = peerConnectionIncoming if upgradeFromPeer != "" { m.evict[upgradeFromPeer] = true } @@ -754,7 +799,7 @@ func (m *PeerManager) Ready(peerID types.NodeID, channels ChannelIDSet) { m.mtx.Lock() defer m.mtx.Unlock() - if m.connected[peerID] { + if m.isConnected(peerID) { m.ready[peerID] = true m.broadcast(PeerUpdate{ NodeID: peerID, @@ -790,7 +835,7 @@ func (m *PeerManager) TryEvictNext() (types.NodeID, error) { // random one. for peerID := range m.evict { delete(m.evict, peerID) - if m.connected[peerID] && !m.evicting[peerID] { + if m.isConnected(peerID) && !m.evicting[peerID] { m.evicting[peerID] = true return peerID, nil } @@ -807,7 +852,7 @@ func (m *PeerManager) TryEvictNext() (types.NodeID, error) { ranked := m.store.Ranked() for i := len(ranked) - 1; i >= 0; i-- { peer := ranked[i] - if m.connected[peer.ID] && !m.evicting[peer.ID] { + if m.isConnected(peer.ID) && !m.evicting[peer.ID] { m.evicting[peer.ID] = true return peer.ID, nil } @@ -852,7 +897,7 @@ func (m *PeerManager) Errored(peerID types.NodeID, err error) { m.mtx.Lock() defer m.mtx.Unlock() - if m.connected[peerID] { + if m.isConnected(peerID) { m.evict[peerID] = true } @@ -1099,7 +1144,7 @@ func (m *PeerManager) findUpgradeCandidate(id types.NodeID, score PeerScore) typ continue case candidate.Score() >= score: return "" // no further peers can be scored lower, due to sorting - case !m.connected[candidate.ID]: + case !m.isConnected(candidate.ID): case m.evict[candidate.ID]: case m.evicting[candidate.ID]: case m.upgrading[candidate.ID] != "": diff --git a/node/setup.go b/node/setup.go index 4b6065605..aef0da0c0 100644 --- a/node/setup.go +++ b/node/setup.go @@ -498,6 +498,7 @@ func createPeerManager( options := p2p.PeerManagerOptions{ SelfAddress: selfAddr, MaxConnected: maxConns, + MaxOutgoingConnections: maxConns / 2, MaxConnectedUpgrade: maxUpgradeConns, MaxFailedDialAttempts: 1024, MaxPeers: maxUpgradeConns + 2*maxConns,