From d1533884463da76ccee02e1392c63789dff93a83 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Wed, 6 Apr 2022 14:02:07 -0400 Subject: [PATCH] p2p: inject nodeinfo into router (#8261) --- internal/p2p/p2ptest/network.go | 2 +- internal/p2p/router.go | 32 ++++++++++++++++-------------- internal/p2p/router_init_test.go | 8 ++++---- internal/p2p/router_test.go | 20 +++++++++---------- node/node.go | 34 ++++++++++++++++---------------- node/node_test.go | 2 +- node/seed.go | 4 +--- node/setup.go | 4 ++-- 8 files changed, 53 insertions(+), 53 deletions(-) diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go index 6016ce0a5..cde14e721 100644 --- a/internal/p2p/p2ptest/network.go +++ b/internal/p2p/p2ptest/network.go @@ -261,9 +261,9 @@ func (n *Network) MakeNode(ctx context.Context, t *testing.T, opts NodeOptions) router, err := p2p.NewRouter( n.logger, p2p.NopMetrics(), - nodeInfo, privKey, peerManager, + func() *types.NodeInfo { return &nodeInfo }, []p2p.Transport{transport}, transport.Endpoints(), p2p.RouterOptions{DialSleep: func(_ context.Context) {}}, diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 025769592..ca9536900 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -150,7 +150,6 @@ type Router struct { metrics *Metrics options RouterOptions - nodeInfo types.NodeInfo privKey crypto.PrivKey peerManager *PeerManager chDescs []*ChannelDescriptor @@ -162,8 +161,9 @@ type Router struct { peerMtx sync.RWMutex peerQueues map[types.NodeID]queue // outbound messages per peer for all channels // the channels that the peer queue has open - peerChannels map[types.NodeID]ChannelIDSet - queueFactory func(int) queue + peerChannels map[types.NodeID]ChannelIDSet + queueFactory func(int) queue + nodeInfoProducer func() *types.NodeInfo // FIXME: We don't strictly need to use a mutex for this if we seal the // channels on router start. This depends on whether we want to allow @@ -179,9 +179,9 @@ type Router struct { func NewRouter( logger log.Logger, metrics *Metrics, - nodeInfo types.NodeInfo, privKey crypto.PrivKey, peerManager *PeerManager, + nodeInfoProducer func() *types.NodeInfo, transports []Transport, endpoints []Endpoint, options RouterOptions, @@ -192,10 +192,10 @@ func NewRouter( } router := &Router{ - logger: logger, - metrics: metrics, - nodeInfo: nodeInfo, - privKey: privKey, + logger: logger, + metrics: metrics, + privKey: privKey, + nodeInfoProducer: nodeInfoProducer, connTracker: newConnTracker( options.MaxIncomingConnectionAttempts, options.IncomingConnectionWindow, @@ -284,7 +284,7 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*C r.channelMessages[id] = messageType // add the channel to the nodeInfo if it's not already there. - r.nodeInfo.AddChannel(uint16(chDesc.ID)) + r.nodeInfoProducer().AddChannel(uint16(chDesc.ID)) for _, t := range r.transports { t.AddChannelDescriptors([]*ChannelDescriptor{chDesc}) @@ -715,7 +715,8 @@ func (r *Router) handshakePeer( defer cancel() } - peerInfo, peerKey, err := conn.Handshake(ctx, r.nodeInfo, r.privKey) + nodeInfo := r.nodeInfoProducer() + peerInfo, peerKey, err := conn.Handshake(ctx, *nodeInfo, r.privKey) if err != nil { return peerInfo, err } @@ -730,7 +731,7 @@ func (r *Router) handshakePeer( return peerInfo, fmt.Errorf("expected to connect with peer %q, got %q", expectID, peerInfo.NodeID) } - if err := r.nodeInfo.CompatibleWith(peerInfo); err != nil { + if err := r.nodeInfoProducer().CompatibleWith(peerInfo); err != nil { return peerInfo, ErrRejected{ err: err, id: peerInfo.ID(), @@ -930,7 +931,7 @@ func (r *Router) evictPeers(ctx context.Context) { // NodeInfo returns a copy of the current NodeInfo. Used for testing. func (r *Router) NodeInfo() types.NodeInfo { - return r.nodeInfo.Copy() + return r.nodeInfoProducer().Copy() } func (r *Router) setupQueueFactory(ctx context.Context) error { @@ -957,11 +958,12 @@ func (r *Router) OnStart(ctx context.Context) error { } } + nodeInfo := r.nodeInfoProducer() r.logger.Info( "starting router", - "node_id", r.nodeInfo.NodeID, - "channels", r.nodeInfo.Channels, - "listen_addr", r.nodeInfo.ListenAddr, + "node_id", nodeInfo.NodeID, + "channels", nodeInfo.Channels, + "listen_addr", nodeInfo.ListenAddr, "transports", len(r.transports), ) diff --git a/internal/p2p/router_init_test.go b/internal/p2p/router_init_test.go index d58c79487..20c3cb6dc 100644 --- a/internal/p2p/router_init_test.go +++ b/internal/p2p/router_init_test.go @@ -23,7 +23,7 @@ func TestRouter_ConstructQueueFactory(t *testing.T) { t.Run("Default", func(t *testing.T) { require.Zero(t, os.Getenv("TM_P2P_QUEUE")) opts := RouterOptions{} - r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts) + r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts) require.NoError(t, err) require.NoError(t, r.setupQueueFactory(ctx)) @@ -32,7 +32,7 @@ func TestRouter_ConstructQueueFactory(t *testing.T) { }) t.Run("Fifo", func(t *testing.T) { opts := RouterOptions{QueueType: queueTypeFifo} - r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts) + r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts) require.NoError(t, err) require.NoError(t, r.setupQueueFactory(ctx)) @@ -41,7 +41,7 @@ func TestRouter_ConstructQueueFactory(t *testing.T) { }) t.Run("Priority", func(t *testing.T) { opts := RouterOptions{QueueType: queueTypePriority} - r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts) + r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts) require.NoError(t, err) require.NoError(t, r.setupQueueFactory(ctx)) @@ -51,7 +51,7 @@ func TestRouter_ConstructQueueFactory(t *testing.T) { }) t.Run("NonExistant", func(t *testing.T) { opts := RouterOptions{QueueType: "fast"} - _, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts) + _, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts) require.Error(t, err) require.Contains(t, err.Error(), "fast") }) diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index 5facfdaff..7ef77a16d 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -108,9 +108,9 @@ func TestRouter_Channel_Basic(t *testing.T) { router, err := p2p.NewRouter( log.NewNopLogger(), p2p.NopMetrics(), - selfInfo, selfKey, peerManager, + func() *types.NodeInfo { return &selfInfo }, nil, nil, p2p.RouterOptions{}, @@ -410,9 +410,9 @@ func TestRouter_AcceptPeers(t *testing.T) { router, err := p2p.NewRouter( log.NewNopLogger(), p2p.NopMetrics(), - selfInfo, selfKey, peerManager, + func() *types.NodeInfo { return &selfInfo }, []p2p.Transport{mockTransport}, nil, p2p.RouterOptions{}, @@ -464,9 +464,9 @@ func TestRouter_AcceptPeers_Error(t *testing.T) { router, err := p2p.NewRouter( log.NewNopLogger(), p2p.NopMetrics(), - selfInfo, selfKey, peerManager, + func() *types.NodeInfo { return &selfInfo }, []p2p.Transport{mockTransport}, nil, p2p.RouterOptions{}, @@ -501,9 +501,9 @@ func TestRouter_AcceptPeers_ErrorEOF(t *testing.T) { router, err := p2p.NewRouter( log.NewNopLogger(), p2p.NopMetrics(), - selfInfo, selfKey, peerManager, + func() *types.NodeInfo { return &selfInfo }, []p2p.Transport{mockTransport}, nil, p2p.RouterOptions{}, @@ -552,9 +552,9 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) { router, err := p2p.NewRouter( log.NewNopLogger(), p2p.NopMetrics(), - selfInfo, selfKey, peerManager, + func() *types.NodeInfo { return &selfInfo }, []p2p.Transport{mockTransport}, nil, p2p.RouterOptions{}, @@ -655,9 +655,9 @@ func TestRouter_DialPeers(t *testing.T) { router, err := p2p.NewRouter( log.NewNopLogger(), p2p.NopMetrics(), - selfInfo, selfKey, peerManager, + func() *types.NodeInfo { return &selfInfo }, []p2p.Transport{mockTransport}, nil, p2p.RouterOptions{}, @@ -740,9 +740,9 @@ func TestRouter_DialPeers_Parallel(t *testing.T) { router, err := p2p.NewRouter( log.NewNopLogger(), p2p.NopMetrics(), - selfInfo, selfKey, peerManager, + func() *types.NodeInfo { return &selfInfo }, []p2p.Transport{mockTransport}, nil, p2p.RouterOptions{ @@ -814,9 +814,9 @@ func TestRouter_EvictPeers(t *testing.T) { router, err := p2p.NewRouter( log.NewNopLogger(), p2p.NopMetrics(), - selfInfo, selfKey, peerManager, + func() *types.NodeInfo { return &selfInfo }, []p2p.Transport{mockTransport}, nil, p2p.RouterOptions{}, @@ -876,9 +876,9 @@ func TestRouter_ChannelCompatability(t *testing.T) { router, err := p2p.NewRouter( log.NewNopLogger(), p2p.NopMetrics(), - selfInfo, selfKey, peerManager, + func() *types.NodeInfo { return &selfInfo }, []p2p.Transport{mockTransport}, nil, p2p.RouterOptions{}, @@ -931,9 +931,9 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) { router, err := p2p.NewRouter( log.NewNopLogger(), p2p.NopMetrics(), - selfInfo, selfKey, peerManager, + func() *types.NodeInfo { return &selfInfo }, []p2p.Transport{mockTransport}, nil, p2p.RouterOptions{}, diff --git a/node/node.go b/node/node.go index 7c3ca1268..e1f4205ad 100644 --- a/node/node.go +++ b/node/node.go @@ -54,10 +54,10 @@ type nodeImpl struct { privValidator types.PrivValidator // local node's validator key // network - peerManager *p2p.PeerManager - router *p2p.Router - nodeInfo types.NodeInfo - nodeKey types.NodeKey // our node privkey + peerManager *p2p.PeerManager + router *p2p.Router + nodeInfoProducer func() *types.NodeInfo + nodeKey types.NodeKey // our node privkey // services eventSinks []indexer.EventSink @@ -213,13 +213,6 @@ func makeNode( } } - // Determine whether we should attempt state sync. - stateSync := cfg.StateSync.Enable && !onlyValidatorIsUs(state, pubKey) - if stateSync && state.LastBlockHeight > 0 { - logger.Info("Found local state with non-zero height, skipping state sync") - stateSync = false - } - // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, // and replays any blocks as necessary to sync tendermint with the app. if err := consensus.NewHandshaker( @@ -256,7 +249,7 @@ func makeNode( makeCloser(closers)) } - router, err := createRouter(logger, nodeMetrics.p2p, nodeInfo, nodeKey, peerManager, cfg, proxyApp) + router, err := createRouter(logger, nodeMetrics.p2p, func() *types.NodeInfo { return &nodeInfo }, nodeKey, peerManager, cfg, proxyApp) if err != nil { return nil, combineCloseError( fmt.Errorf("failed to create router: %w", err), @@ -288,6 +281,13 @@ func makeNode( nodeMetrics.state, ) + // Determine whether we should attempt state sync. + stateSync := cfg.StateSync.Enable && !onlyValidatorIsUs(state, pubKey) + if stateSync && state.LastBlockHeight > 0 { + logger.Info("Found local state with non-zero height, skipping state sync") + stateSync = false + } + // Determine whether we should do block sync. This must happen after the handshake, since the // app may modify the validator set, specifying ourself as the only validator. blockSync := !onlyValidatorIsUs(state, pubKey) @@ -355,10 +355,10 @@ func makeNode( genesisDoc: genDoc, privValidator: privValidator, - peerManager: peerManager, - router: router, - nodeInfo: nodeInfo, - nodeKey: nodeKey, + peerManager: peerManager, + router: router, + nodeInfoProducer: func() *types.NodeInfo { return &nodeInfo }, + nodeKey: nodeKey, eventSinks: eventSinks, @@ -458,7 +458,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { return err } - n.rpcEnv.NodeInfo = n.nodeInfo + n.rpcEnv.NodeInfo = n.nodeInfoProducer().Copy() // Start the RPC server before the P2P server // so we can eg. receive txs for the first block if n.config.RPC.ListenAddress != "" { diff --git a/node/node_test.go b/node/node_test.go index 1a1fa6f81..86ed7960c 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -147,7 +147,7 @@ func TestNodeSetAppVersion(t *testing.T) { assert.Equal(t, state.Version.Consensus.App, appVersion) // check version is set in node info - assert.Equal(t, n.nodeInfo.ProtocolVersion.App, appVersion) + assert.Equal(t, n.nodeInfoProducer().ProtocolVersion.App, appVersion) } func TestNodeSetPrivValTCP(t *testing.T) { diff --git a/node/seed.go b/node/seed.go index 6da7ff05f..a0b71e411 100644 --- a/node/seed.go +++ b/node/seed.go @@ -29,7 +29,6 @@ type seedNodeImpl struct { // network peerManager *p2p.PeerManager router *p2p.Router - nodeInfo types.NodeInfo nodeKey types.NodeKey // our node privkey isListening bool @@ -75,7 +74,7 @@ func makeSeedNode( closer) } - router, err := createRouter(logger, p2pMetrics, nodeInfo, nodeKey, peerManager, cfg, nil) + router, err := createRouter(logger, p2pMetrics, func() *types.NodeInfo { return &nodeInfo }, nodeKey, peerManager, cfg, nil) if err != nil { return nil, combineCloseError( fmt.Errorf("failed to create router: %w", err), @@ -87,7 +86,6 @@ func makeSeedNode( logger: logger, genesisDoc: genDoc, - nodeInfo: nodeInfo, nodeKey: nodeKey, peerManager: peerManager, router: router, diff --git a/node/setup.go b/node/setup.go index 07626d611..e87fac79c 100644 --- a/node/setup.go +++ b/node/setup.go @@ -363,7 +363,7 @@ func createPeerManager( func createRouter( logger log.Logger, p2pMetrics *p2p.Metrics, - nodeInfo types.NodeInfo, + nodeInfoProducer func() *types.NodeInfo, nodeKey types.NodeKey, peerManager *p2p.PeerManager, cfg *config.Config, @@ -392,9 +392,9 @@ func createRouter( return p2p.NewRouter( p2pLogger, p2pMetrics, - nodeInfo, nodeKey.PrivKey, peerManager, + nodeInfoProducer, []p2p.Transport{transport}, []p2p.Endpoint{ep}, getRouterConfig(cfg, appClient),