From c2f97e64545920b98aa4ea48f9669b643a80b001 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 20 Jan 2018 18:28:40 -0500 Subject: [PATCH] p2p: seed mode fixes from rebase and review --- p2p/connection.go | 4 + p2p/pex_reactor.go | 418 +++++++++++++--------------------------- p2p/pex_reactor_test.go | 48 +++-- 3 files changed, 158 insertions(+), 312 deletions(-) diff --git a/p2p/connection.go b/p2p/connection.go index 306eaf7eb..dcb660967 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -88,6 +88,8 @@ type MConnection struct { flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled. pingTimer *cmn.RepeatTimer // send pings periodically chStatsTimer *cmn.RepeatTimer // update channel stats periodically + + created time.Time // time of creation } // MConnConfig is a MConnection configuration. @@ -502,6 +504,7 @@ FOR_LOOP: } type ConnectionStatus struct { + Duration time.Duration SendMonitor flow.Status RecvMonitor flow.Status Channels []ChannelStatus @@ -517,6 +520,7 @@ type ChannelStatus struct { func (c *MConnection) Status() ConnectionStatus { var status ConnectionStatus + status.Duration = time.Since(c.created) status.SendMonitor = c.sendMonitor.Status() status.RecvMonitor = c.recvMonitor.Status() status.Channels = make([]ChannelStatus, len(c.channels)) diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 0c3567a35..5d9194213 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -17,15 +17,22 @@ const ( // PexChannel is a channel for PEX messages PexChannel = byte(0x00) - // period to ensure peers connected - defaultEnsurePeersPeriod = 30 * time.Second - minNumOutboundPeers = 10 - maxPexMessageSize = 1048576 // 1MB + maxPexMessageSize = 1048576 // 1MB + + // ensure we have enough peers + defaultEnsurePeersPeriod = 30 * time.Second + defaultMinNumOutboundPeers = 10 // Seed/Crawler constants - defaultSeedDisconnectWaitPeriod = 2 * time.Minute - defaultCrawlPeerInterval = 2 * time.Minute - defaultCrawlPeersPeriod = 30 * time.Second + // TODO: + // We want seeds to only advertise good peers. + // Peers are marked by external mechanisms. + // We need a config value that can be set to be + // on the order of how long it would take before a good + // peer is marked good. + defaultSeedDisconnectWaitPeriod = 2 * time.Minute // disconnect after this + defaultCrawlPeerInterval = 2 * time.Minute // dont redial for this. TODO: back-off + defaultCrawlPeersPeriod = 30 * time.Second // check some peers every this ) // PEXReactor handles PEX (peer exchange) and ensures that an @@ -51,8 +58,11 @@ type PEXReactor struct { // PEXReactorConfig holds reactor specific configuration data. type PEXReactorConfig struct { - // Seeds is a list of addresses reactor may use if it can't connect to peers - // in the addrbook. + // Seed/Crawler mode + SeedMode bool + + // Seeds is a list of addresses reactor may use + // if it can't connect to peers in the addrbook. Seeds []string } @@ -259,19 +269,12 @@ func (r *PEXReactor) ensurePeersRoutine() { // ensurePeers ensures that sufficient peers are connected. (once) // -// Old bucket / New bucket are arbitrary categories to denote whether an -// address is vetted or not, and this needs to be determined over time via a // heuristic that we haven't perfected yet, or, perhaps is manually edited by // the node operator. It should not be used to compute what addresses are // already connected or not. -// -// TODO Basically, we need to work harder on our good-peer/bad-peer marking. -// What we're currently doing in terms of marking good/bad peers is just a -// placeholder. It should not be the case that an address becomes old/vetted -// upon a single successful connection. func (r *PEXReactor) ensurePeers() { numOutPeers, numInPeers, numDialing := r.Switch.NumPeers() - numToDial := minNumOutboundPeers - (numOutPeers + numDialing) + numToDial := defaultMinNumOutboundPeers - (numOutPeers + numDialing) r.Logger.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial) if numToDial <= 0 { return @@ -327,7 +330,7 @@ func (r *PEXReactor) ensurePeers() { // If we are not connected to nor dialing anybody, fallback to dialing a seed. if numOutPeers+numInPeers+numDialing+len(toDial) == 0 { r.Logger.Info("No addresses to dial nor connected peers. Falling back to seeds") - r.dialSeed() + r.dialSeeds() } } @@ -346,275 +349,8 @@ func (r *PEXReactor) checkSeeds() error { return nil } -// Explores the network searching for more peers. (continuous) -// Seed/Crawler Mode causes this node to quickly disconnect -// from peers, except other seed nodes. -func (r *PEXReactor) crawlPeersRoutine() { - // Do an initial crawl - r.crawlPeers() - - // Fire periodically - ticker := time.NewTicker(defaultSeedModePeriod) - - for { - select { - case <-ticker.C: - r.attemptDisconnects() - r.crawlPeers() - case <-r.Quit: - return - } - } -} - -// crawlStatus handles temporary data needed for the -// network crawling performed during seed/crawler mode. -type crawlStatus struct { - // The remote address of a potential peer we learned about - Addr *NetAddress - - // Not empty if we are connected to the address - PeerID string - - // The last time we attempt to reach this address - LastAttempt time.Time - - // The last time we successfully reached this address - LastSuccess time.Time -} - -// oldestFirst implements sort.Interface for []crawlStatus -// based on the LastAttempt field. -type oldestFirst []crawlStatus - -func (of oldestFirst) Len() int { return len(of) } -func (of oldestFirst) Swap(i, j int) { of[i], of[j] = of[j], of[i] } -func (of oldestFirst) Less(i, j int) bool { return of[i].LastAttempt.Before(of[j].LastAttempt) } - -// getCrawlStatus returns addresses of potential peers that we wish to validate. -// NOTE: The status information is ordered as described above. -func (r *PEXReactor) getCrawlStatus() []crawlStatus { - var of oldestFirst - - addrs := r.book.ListOfKnownAddresses() - // Go through all the addresses in the AddressBook - for _, addr := range addrs { - var peerID string - - // Check if a peer is already connected from this addr - if p := r.Switch.peers.GetByRemoteAddr(addr.Addr); p != nil { - peerID = p.Key() - } - - of = append(of, crawlStatus{ - Addr: addr.Addr, - PeerID: peerID, - LastAttempt: addr.LastAttempt, - LastSuccess: addr.LastSuccess, - }) - } - sort.Sort(of) - return of -} - -// crawlPeers will crawl the network looking for new peer addresses. (once) -// -// TODO Basically, we need to work harder on our good-peer/bad-peer marking. -// What we're currently doing in terms of marking good/bad peers is just a -// placeholder. It should not be the case that an address becomes old/vetted -// upon a single successful connection. -func (r *PEXReactor) crawlPeers() { - crawlerStatus := r.getCrawlStatus() - - now := time.Now() - // Use addresses we know of to reach additional peers - for _, cs := range crawlerStatus { - // Do not dial peers that are already connected - if cs.PeerID != "" { - continue - } - // Do not attempt to connect with peers we recently dialed - if now.Sub(cs.LastAttempt) < defaultCrawlPeerInterval { - continue - } - // Otherwise, attempt to connect with the known address - p, err := r.Switch.DialPeerWithAddress(cs.Addr, false) - if err != nil { - r.book.MarkAttempt(cs.Addr) - continue - } - // Enter the peer ID into our crawl status information - cs.PeerID = p.Key() - r.book.MarkGood(cs.Addr) - } - // Crawl the connected peers asking for more addresses - for _, cs := range crawlerStatus { - if cs.PeerID == "" { - continue - } - // We will wait a minimum period of time before crawling peers again - if now.Sub(cs.LastAttempt) >= defaultCrawlPeerInterval { - p := r.Switch.Peers().Get(cs.PeerID) - if p != nil { - r.RequestPEX(p) - r.book.MarkAttempt(cs.Addr) - } - } - } -} - -// attemptDisconnects checks the crawlStatus info for Peers to disconnect from. (once) -func (r *PEXReactor) attemptDisconnects() { - crawlerStatus := r.getCrawlStatus() - - now := time.Now() - // Go through each peer we have connected with - // looking for opportunities to disconnect - for _, cs := range crawlerStatus { - if cs.PeerID == "" { - continue - } - // Remain connected to each peer for a minimum period of time - if now.Sub(cs.LastSuccess) < defaultSeedDisconnectWaitPeriod { - continue - } - // Fetch the Peer using the saved ID - p := r.Switch.Peers().Get(cs.PeerID) - if p == nil { - continue - } - // Do not disconnect from persistent peers. - // Specifically, we need to remain connected to other seeds - if p.IsPersistent() { - continue - } - // Otherwise, disconnect from the peer - r.Switch.StopPeerGracefully(p) - } -} - -// crawlStatus handles temporary data needed for the -// network crawling performed during seed/crawler mode. -type crawlStatus struct { - // The remote address of a potential peer we learned about - Addr *NetAddress - - // Not empty if we are connected to the address - PeerID string - - // The last time we attempt to reach this address - LastAttempt time.Time - - // The last time we successfully reached this address - LastSuccess time.Time -} - -// oldestAttempt implements sort.Interface for []crawlStatus -// based on the LastAttempt field. -type oldestAttempt []crawlStatus - -func (oa oldestAttempt) Len() int { return len(oa) } -func (oa oldestAttempt) Swap(i, j int) { oa[i], oa[j] = oa[j], oa[i] } -func (oa oldestAttempt) Less(i, j int) bool { return oa[i].LastAttempt.Before(oa[j].LastAttempt) } - -// getCrawlStatus returns addresses of potential peers that we wish to validate. -// NOTE: The status information is ordered as described above. -func (r *PEXReactor) getCrawlStatus() []crawlStatus { - var oa oldestAttempt - - addrs := r.book.ListOfKnownAddresses() - // Go through all the addresses in the AddressBook - for _, addr := range addrs { - p := r.Switch.peers.GetByRemoteAddr(addr.Addr) - - oa = append(oa, crawlStatus{ - Addr: addr.Addr, - PeerID: p.Key(), - LastAttempt: addr.LastAttempt, - LastSuccess: addr.LastSuccess, - }) - } - sort.Sort(oa) - return oa -} - -// crawlPeers will crawl the network looking for new peer addresses. (once) -// -// TODO Basically, we need to work harder on our good-peer/bad-peer marking. -// What we're currently doing in terms of marking good/bad peers is just a -// placeholder. It should not be the case that an address becomes old/vetted -// upon a single successful connection. -func (r *PEXReactor) crawlPeers() { - crawlerStatus := r.getCrawlStatus() - - now := time.Now() - // Use addresses we know of to reach additional peers - for _, cs := range crawlerStatus { - // Do not dial peers that are already connected - if cs.PeerID != "" { - continue - } - // Do not attempt to connect with peers we recently dialed - if now.Sub(cs.LastAttempt) < defaultCrawlPeerInterval { - continue - } - // Otherwise, attempt to connect with the known address - p, err := r.Switch.DialPeerWithAddress(cs.Addr, false) - if err != nil { - r.book.MarkAttempt(cs.Addr) - continue - } - // Enter the peer ID into our crawl status information - cs.PeerID = p.Key() - r.book.MarkGood(cs.Addr) - } - // Crawl the connected peers asking for more addresses - for _, cs := range crawlerStatus { - if cs.PeerID == "" { - continue - } - // We will wait a minimum period of time before crawling peers again - if now.Sub(cs.LastAttempt) >= defaultCrawlPeerInterval { - p := r.Switch.peers.Get(cs.PeerID) - if p != nil { - r.RequestPEX(p) - } - } - } -} - -// attemptDisconnects checks the crawlStatus info for Peers to disconnect from. (once) -func (r *PEXReactor) attemptDisconnects() { - crawlerStatus := r.getCrawlStatus() - - now := time.Now() - // Go through each peer we have connected with - // looking for opportunities to disconnect - for _, cs := range crawlerStatus { - if cs.PeerID == "" { - continue - } - // Remain connected to each peer for a minimum period of time - if now.Sub(cs.LastSuccess) < defaultSeedDisconnectWaitPeriod { - continue - } - // Fetch the Peer using the saved ID - p := r.Switch.peers.Get(cs.PeerID) - if p == nil { - continue - } - // Do not disconnect from persistent peers. - // Specifically, we need to remain connected to other seeds - if p.IsPersistent() { - continue - } - // Otherwise, disconnect from the peer - r.Switch.StopPeerGracefully(p) - } -} - // randomly dial seeds until we connect to one or exhaust them -func (r *PEXReactor) dialSeed() { +func (r *PEXReactor) dialSeeds() { lSeeds := len(r.config.Seeds) if lSeeds == 0 { return @@ -636,6 +372,116 @@ func (r *PEXReactor) dialSeed() { r.Switch.Logger.Error("Couldn't connect to any seeds") } +//---------------------------------------------------------- + +// Explores the network searching for more peers. (continuous) +// Seed/Crawler Mode causes this node to quickly disconnect +// from peers, except other seed nodes. +func (r *PEXReactor) crawlPeersRoutine() { + // Do an initial crawl + r.crawlPeers() + + // Fire periodically + ticker := time.NewTicker(defaultCrawlPeersPeriod) + + for { + select { + case <-ticker.C: + r.attemptDisconnects() + r.crawlPeers() + case <-r.Quit: + return + } + } +} + +// crawlPeerInfo handles temporary data needed for the +// network crawling performed during seed/crawler mode. +type crawlPeerInfo struct { + // The listening address of a potential peer we learned about + Addr *NetAddress + + // The last time we attempt to reach this address + LastAttempt time.Time + + // The last time we successfully reached this address + LastSuccess time.Time +} + +// oldestFirst implements sort.Interface for []crawlPeerInfo +// based on the LastAttempt field. +type oldestFirst []crawlPeerInfo + +func (of oldestFirst) Len() int { return len(of) } +func (of oldestFirst) Swap(i, j int) { of[i], of[j] = of[j], of[i] } +func (of oldestFirst) Less(i, j int) bool { return of[i].LastAttempt.Before(of[j].LastAttempt) } + +// getPeersToCrawl returns addresses of potential peers that we wish to validate. +// NOTE: The status information is ordered as described above. +func (r *PEXReactor) getPeersToCrawl() []crawlPeerInfo { + var of oldestFirst + + // TODO: not this. be more selective + addrs := r.book.ListOfKnownAddresses() + for _, addr := range addrs { + if len(addr.ID()) == 0 { + continue // dont use peers without id + } + + of = append(of, crawlPeerInfo{ + Addr: addr.Addr, + LastAttempt: addr.LastAttempt, + LastSuccess: addr.LastSuccess, + }) + } + sort.Sort(of) + return of +} + +// crawlPeers will crawl the network looking for new peer addresses. (once) +func (r *PEXReactor) crawlPeers() { + peerInfos := r.getPeersToCrawl() + + now := time.Now() + // Use addresses we know of to reach additional peers + for _, pi := range peerInfos { + // Do not attempt to connect with peers we recently dialed + if now.Sub(pi.LastAttempt) < defaultCrawlPeerInterval { + continue + } + // Otherwise, attempt to connect with the known address + _, err := r.Switch.DialPeerWithAddress(pi.Addr, false) + if err != nil { + r.book.MarkAttempt(pi.Addr) + continue + } + } + // Crawl the connected peers asking for more addresses + for _, pi := range peerInfos { + // We will wait a minimum period of time before crawling peers again + if now.Sub(pi.LastAttempt) >= defaultCrawlPeerInterval { + peer := r.Switch.Peers().Get(pi.Addr.ID) + if peer != nil { + r.RequestPEX(peer) + } + } + } +} + +// attemptDisconnects checks if we've been with each peer long enough to disconnect +func (r *PEXReactor) attemptDisconnects() { + for _, peer := range r.Switch.Peers().List() { + status := peer.Status() + if status.Duration < defaultSeedDisconnectWaitPeriod { + continue + } + if peer.IsPersistent() { + continue + } + r.Switch.StopPeerGracefully(peer) + } +} + //----------------------------------------------------------------------------- // Messages diff --git a/p2p/pex_reactor_test.go b/p2p/pex_reactor_test.go index b8ee89b32..91e30fea2 100644 --- a/p2p/pex_reactor_test.go +++ b/p2p/pex_reactor_test.go @@ -295,46 +295,42 @@ func TestPEXReactorCrawlStatus(t *testing.T) { book := NewAddrBook(dir+"addrbook.json", false) book.SetLogger(log.TestingLogger()) - var r *PEXReactor + pexR := NewPEXReactor(book, &PEXReactorConfig{SeedMode: true}) // Seed/Crawler mode uses data from the Switch makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { - r = NewPEXReactor(book, true) - r.SetLogger(log.TestingLogger()) + pexR.SetLogger(log.TestingLogger()) sw.SetLogger(log.TestingLogger().With("switch", i)) - sw.AddReactor("pex", r) + sw.AddReactor("pex", pexR) return sw }) - // Create a peer, and add it to the peer set + // Create a peer, add it to the peer set and the addrbook. peer := createRandomPeer(false) - r.Switch.peers.Add(peer) - // Add the peer address to the address book - addr1, _ := NewNetAddressString(peer.NodeInfo().ListenAddr) - r.book.AddAddress(addr1, addr1) - // Add an address to the book that does not have a peer - _, addr2 := createRoutableAddr() - r.book.AddAddress(addr2, addr1) + pexR.Switch.peers.Add(peer) + addr1 := peer.NodeInfo().NetAddress() + pexR.book.AddAddress(addr1, addr1) - // Get the crawl status data - status := r.getCrawlStatus() + // Add a non-connected address to the book. + _, addr2 := createRoutableAddr() + pexR.book.AddAddress(addr2, addr1) + + // Get some peerInfos to crawl + peerInfos := pexR.getPeersToCrawl() // Make sure it has the proper number of elements - assert.Equal(2, len(status)) + assert.Equal(2, len(peerInfos)) - var num int - for _, cs := range status { - if cs.PeerID != "" { - num++ - } - } - // Check that only one has been identified as a connected peer - assert.Equal(1, num) + // TODO: test } func createRoutableAddr() (addr string, netAddr *NetAddress) { for { - addr = cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256) - netAddr, _ = NewNetAddressString(addr) + var err error + addr = cmn.Fmt("%X@%v.%v.%v.%v:46656", cmn.RandBytes(20), rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256) + netAddr, err = NewNetAddressString(addr) + if err != nil { + panic(err) + } if netAddr.Routable() { break } @@ -346,7 +342,7 @@ func createRandomPeer(outbound bool) *peer { addr, netAddr := createRoutableAddr() p := &peer{ nodeInfo: NodeInfo{ - ListenAddr: netAddr.String(), + ListenAddr: netAddr.DialString(), PubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(), }, outbound: outbound,