diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go index 1daba3f14..413188f93 100644 --- a/internal/p2p/p2ptest/network.go +++ b/internal/p2p/p2ptest/network.go @@ -95,10 +95,8 @@ func (n *Network) Start(t *testing.T) { select { case peerUpdate := <-sourceSub.Updates(): - require.Equal(t, p2p.PeerUpdate{ - NodeID: targetNode.NodeID, - Status: p2p.PeerStatusUp, - }, peerUpdate) + require.Equal(t, targetNode.NodeID, peerUpdate.NodeID) + require.Equal(t, p2p.PeerStatusUp, peerUpdate.Status) case <-time.After(3 * time.Second): require.Fail(t, "timed out waiting for peer", "%v dialing %v", sourceNode.NodeID, targetNode.NodeID) @@ -106,6 +104,7 @@ func (n *Network) Start(t *testing.T) { select { case peerUpdate := <-targetSub.Updates(): + peerUpdate.Channels = nil require.Equal(t, p2p.PeerUpdate{ NodeID: sourceNode.NodeID, Status: p2p.PeerStatusUp, diff --git a/internal/p2p/p2ptest/require.go b/internal/p2p/p2ptest/require.go index 3598baba0..5be38cf17 100644 --- a/internal/p2p/p2ptest/require.go +++ b/internal/p2p/p2ptest/require.go @@ -120,11 +120,10 @@ func RequireUpdate(t *testing.T, peerUpdates *p2p.PeerUpdates, expect p2p.PeerUp select { case update := <-peerUpdates.Updates(): - require.Equal(t, expect, update, "peer update did not match") - + require.Equal(t, expect.NodeID, update.NodeID, "node id did not match") + require.Equal(t, expect.Status, update.Status, "statuses did not match") case <-peerUpdates.Done(): require.Fail(t, "peer updates subscription is closed") - case <-timer.C: require.Fail(t, "timed out waiting for peer update", "expected %v", expect) } @@ -142,7 +141,11 @@ func RequireUpdates(t *testing.T, peerUpdates *p2p.PeerUpdates, expect []p2p.Pee case update := <-peerUpdates.Updates(): actual = append(actual, update) if len(actual) == len(expect) { - require.Equal(t, expect, actual) + for idx := range expect { + require.Equal(t, expect[idx].NodeID, actual[idx].NodeID) + require.Equal(t, expect[idx].Status, actual[idx].Status) + } + return } diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index 62f2c1a73..4f64c0677 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -47,8 +47,9 @@ const ( // PeerUpdate is a peer update event sent via PeerUpdates. type PeerUpdate struct { - NodeID types.NodeID - Status PeerStatus + NodeID types.NodeID + Status PeerStatus + Channels ChannelIDSet } // PeerUpdates is a peer update subscription with notifications about peer @@ -697,19 +698,23 @@ func (m *PeerManager) Accepted(peerID types.NodeID) error { return nil } -// Ready marks a peer as ready, broadcasting status updates to subscribers. The -// peer must already be marked as connected. This is separate from Dialed() and -// Accepted() to allow the router to set up its internal queues before reactors -// start sending messages. -func (m *PeerManager) Ready(peerID types.NodeID) { +// Ready marks a peer as ready, broadcasting status updates to +// subscribers. The peer must already be marked as connected. This is +// separate from Dialed() and Accepted() to allow the router to set up +// its internal queues before reactors start sending messages. The +// channels set here are passed in the peer update broadcast to +// reactors, which can then mediate their own behavior based on the +// capability of the peers. +func (m *PeerManager) Ready(peerID types.NodeID, channels ChannelIDSet) { m.mtx.Lock() defer m.mtx.Unlock() if m.connected[peerID] { m.ready[peerID] = true m.broadcast(PeerUpdate{ - NodeID: peerID, - Status: PeerStatusUp, + NodeID: peerID, + Status: PeerStatusUp, + Channels: channels, }) } } @@ -1242,6 +1247,7 @@ type peerInfo struct { // These fields are ephemeral, i.e. not persisted to the database. Persistent bool + Seed bool Height int64 FixedScore PeerScore // mainly for tests @@ -1264,6 +1270,7 @@ func peerInfoFromProto(msg *p2pproto.PeerInfo) (*peerInfo, error) { return nil, err } p.AddressInfo[addressInfo.Address] = addressInfo + } return p, p.Validate() } diff --git a/internal/p2p/peermanager_test.go b/internal/p2p/peermanager_test.go index 4f6eecd76..2868c14e4 100644 --- a/internal/p2p/peermanager_test.go +++ b/internal/p2p/peermanager_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/fortytw2/leaktest" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" dbm "github.com/tendermint/tm-db" @@ -1271,7 +1272,7 @@ func TestPeerManager_Ready(t *testing.T) { require.Equal(t, p2p.PeerStatusDown, peerManager.Status(a.NodeID)) // Marking a as ready should transition it to PeerStatusUp and send an update. - peerManager.Ready(a.NodeID) + peerManager.Ready(a.NodeID, nil) require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID)) require.Equal(t, p2p.PeerUpdate{ NodeID: a.NodeID, @@ -1283,11 +1284,31 @@ func TestPeerManager_Ready(t *testing.T) { require.NoError(t, err) require.True(t, added) require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID)) - peerManager.Ready(b.NodeID) + peerManager.Ready(b.NodeID, nil) require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID)) require.Empty(t, sub.Updates()) } +func TestPeerManager_Ready_Channels(t *testing.T) { + pm, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + require.NoError(t, err) + + sub := pm.Subscribe() + + a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} + added, err := pm.Add(a) + require.NoError(t, err) + require.True(t, added) + require.NoError(t, pm.Accepted(a.NodeID)) + + pm.Ready(a.NodeID, p2p.ChannelIDSet{42: struct{}{}}) + require.NotEmpty(t, sub.Updates()) + update := <-sub.Updates() + assert.Equal(t, a.NodeID, update.NodeID) + require.True(t, update.Channels.Contains(42)) + require.False(t, update.Channels.Contains(48)) +} + // See TryEvictNext for most tests, this just tests blocking behavior. func TestPeerManager_EvictNext(t *testing.T) { a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} @@ -1299,7 +1320,7 @@ func TestPeerManager_EvictNext(t *testing.T) { require.NoError(t, err) require.True(t, added) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(a.NodeID) + peerManager.Ready(a.NodeID, nil) // Since there are no peers to evict, EvictNext should block until timeout. timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) @@ -1332,7 +1353,7 @@ func TestPeerManager_EvictNext_WakeOnError(t *testing.T) { require.NoError(t, err) require.True(t, added) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(a.NodeID) + peerManager.Ready(a.NodeID, nil) // Spawn a goroutine to error a peer after a delay. go func() { @@ -1364,7 +1385,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeDialed(t *testing.T) { require.NoError(t, err) require.True(t, added) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(a.NodeID) + peerManager.Ready(a.NodeID, nil) // Spawn a goroutine to upgrade to b with a delay. go func() { @@ -1402,7 +1423,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeAccepted(t *testing.T) { require.NoError(t, err) require.True(t, added) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(a.NodeID) + peerManager.Ready(a.NodeID, nil) // Spawn a goroutine to upgrade b with a delay. go func() { @@ -1434,7 +1455,7 @@ func TestPeerManager_TryEvictNext(t *testing.T) { // Connecting to a won't evict anything either. require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(a.NodeID) + peerManager.Ready(a.NodeID, nil) // But if a errors it should be evicted. peerManager.Errored(a.NodeID, errors.New("foo")) @@ -1479,7 +1500,7 @@ func TestPeerManager_Disconnected(t *testing.T) { _, err = peerManager.Add(a) require.NoError(t, err) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(a.NodeID) + peerManager.Ready(a.NodeID, nil) require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID)) require.NotEmpty(t, sub.Updates()) require.Equal(t, p2p.PeerUpdate{ @@ -1531,7 +1552,7 @@ func TestPeerManager_Errored(t *testing.T) { require.Zero(t, evict) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(a.NodeID) + peerManager.Ready(a.NodeID, nil) evict, err = peerManager.TryEvictNext() require.NoError(t, err) require.Zero(t, evict) @@ -1562,7 +1583,7 @@ func TestPeerManager_Subscribe(t *testing.T) { require.NoError(t, peerManager.Accepted(a.NodeID)) require.Empty(t, sub.Updates()) - peerManager.Ready(a.NodeID) + peerManager.Ready(a.NodeID, nil) require.NotEmpty(t, sub.Updates()) require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates()) @@ -1579,7 +1600,7 @@ func TestPeerManager_Subscribe(t *testing.T) { require.NoError(t, peerManager.Dialed(a)) require.Empty(t, sub.Updates()) - peerManager.Ready(a.NodeID) + peerManager.Ready(a.NodeID, nil) require.NotEmpty(t, sub.Updates()) require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates()) @@ -1619,7 +1640,7 @@ func TestPeerManager_Subscribe_Close(t *testing.T) { require.NoError(t, peerManager.Accepted(a.NodeID)) require.Empty(t, sub.Updates()) - peerManager.Ready(a.NodeID) + peerManager.Ready(a.NodeID, nil) require.NotEmpty(t, sub.Updates()) require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates()) @@ -1649,7 +1670,7 @@ func TestPeerManager_Subscribe_Broadcast(t *testing.T) { require.NoError(t, err) require.True(t, added) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(a.NodeID) + peerManager.Ready(a.NodeID, nil) expectUp := p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp} require.NotEmpty(t, s1) diff --git a/internal/p2p/router.go b/internal/p2p/router.go index d0a8e5360..6a96d0172 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -260,7 +260,7 @@ type Router struct { peerMtx sync.RWMutex peerQueues map[types.NodeID]queue // outbound messages per peer for all channels // the channels that the peer queue has open - peerChannels map[types.NodeID]channelIDs + peerChannels map[types.NodeID]ChannelIDSet queueFactory func(int) queue // FIXME: We don't strictly need to use a mutex for this if we seal the @@ -306,7 +306,7 @@ func NewRouter( channelQueues: map[ChannelID]queue{}, channelMessages: map[ChannelID]proto.Message{}, peerQueues: map[types.NodeID]queue{}, - peerChannels: make(map[types.NodeID]channelIDs), + peerChannels: make(map[types.NodeID]ChannelIDSet), } router.BaseService = service.NewBaseService(logger, "router", router) @@ -739,7 +739,7 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) { go r.routePeer(address.NodeID, conn, toChannelIDs(peerInfo.Channels)) } -func (r *Router) getOrMakeQueue(peerID types.NodeID, channels channelIDs) queue { +func (r *Router) getOrMakeQueue(peerID types.NodeID, channels ChannelIDSet) queue { r.peerMtx.Lock() defer r.peerMtx.Unlock() @@ -851,9 +851,9 @@ func (r *Router) runWithPeerMutex(fn func() error) error { // routePeer routes inbound and outbound messages between a peer and the reactor // channels. It will close the given connection and send queue when done, or if // they are closed elsewhere it will cause this method to shut down and return. -func (r *Router) routePeer(peerID types.NodeID, conn Connection, channels channelIDs) { +func (r *Router) routePeer(peerID types.NodeID, conn Connection, channels ChannelIDSet) { r.metrics.Peers.Add(1) - r.peerManager.Ready(peerID) + r.peerManager.Ready(peerID, channels) sendQueue := r.getOrMakeQueue(peerID, channels) defer func() { @@ -1092,9 +1092,14 @@ func (r *Router) stopCtx() context.Context { return ctx } -type channelIDs map[ChannelID]struct{} +type ChannelIDSet map[ChannelID]struct{} -func toChannelIDs(bytes []byte) channelIDs { +func (cs ChannelIDSet) Contains(id ChannelID) bool { + _, ok := cs[id] + return ok +} + +func toChannelIDs(bytes []byte) ChannelIDSet { c := make(map[ChannelID]struct{}, len(bytes)) for _, b := range bytes { c[ChannelID(b)] = struct{}{} diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index d0c656318..7625e53e0 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -885,7 +885,17 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { switch peerUpdate.Status { case p2p.PeerStatusUp: - r.peers.Append(peerUpdate.NodeID) + if peerUpdate.Channels.Contains(SnapshotChannel) && + peerUpdate.Channels.Contains(ChunkChannel) && + peerUpdate.Channels.Contains(LightBlockChannel) && + peerUpdate.Channels.Contains(ParamsChannel) { + + r.peers.Append(peerUpdate.NodeID) + + } else { + r.Logger.Error("could not use peer for statesync", "peer", peerUpdate.NodeID) + } + case p2p.PeerStatusDown: r.peers.Remove(peerUpdate.NodeID) } @@ -898,6 +908,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { switch peerUpdate.Status { case p2p.PeerStatusUp: + newProvider := NewBlockProvider(peerUpdate.NodeID, r.chainID, r.dispatcher) r.providers[peerUpdate.NodeID] = newProvider err := r.syncer.AddPeer(peerUpdate.NodeID) diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index 0a9034576..759869f42 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -453,10 +453,22 @@ func TestReactor_BlockProviders(t *testing.T) { rts.peerUpdateCh <- p2p.PeerUpdate{ NodeID: types.NodeID("aa"), Status: p2p.PeerStatusUp, + Channels: p2p.ChannelIDSet{ + SnapshotChannel: struct{}{}, + ChunkChannel: struct{}{}, + LightBlockChannel: struct{}{}, + ParamsChannel: struct{}{}, + }, } rts.peerUpdateCh <- p2p.PeerUpdate{ NodeID: types.NodeID("bb"), Status: p2p.PeerStatusUp, + Channels: p2p.ChannelIDSet{ + SnapshotChannel: struct{}{}, + ChunkChannel: struct{}{}, + LightBlockChannel: struct{}{}, + ParamsChannel: struct{}{}, + }, } closeCh := make(chan struct{}) @@ -591,6 +603,12 @@ func TestReactor_Backfill(t *testing.T) { rts.peerUpdateCh <- p2p.PeerUpdate{ NodeID: types.NodeID(peer), Status: p2p.PeerStatusUp, + Channels: p2p.ChannelIDSet{ + SnapshotChannel: struct{}{}, + ChunkChannel: struct{}{}, + LightBlockChannel: struct{}{}, + ParamsChannel: struct{}{}, + }, } } @@ -789,6 +807,12 @@ func graduallyAddPeers( peerUpdateCh <- p2p.PeerUpdate{ NodeID: factory.RandomNodeID(), Status: p2p.PeerStatusUp, + Channels: p2p.ChannelIDSet{ + SnapshotChannel: struct{}{}, + ChunkChannel: struct{}{}, + LightBlockChannel: struct{}{}, + ParamsChannel: struct{}{}, + }, } case <-closeCh: return