p2p: inject nodeinfo into router (#8261)

This commit is contained in:
Sam Kleinman
2022-04-06 14:02:07 -04:00
committed by GitHub
parent 2304ea70f7
commit d153388446
8 changed files with 53 additions and 53 deletions

View File

@@ -261,9 +261,9 @@ func (n *Network) MakeNode(ctx context.Context, t *testing.T, opts NodeOptions)
router, err := p2p.NewRouter(
n.logger,
p2p.NopMetrics(),
nodeInfo,
privKey,
peerManager,
func() *types.NodeInfo { return &nodeInfo },
[]p2p.Transport{transport},
transport.Endpoints(),
p2p.RouterOptions{DialSleep: func(_ context.Context) {}},

View File

@@ -150,7 +150,6 @@ type Router struct {
metrics *Metrics
options RouterOptions
nodeInfo types.NodeInfo
privKey crypto.PrivKey
peerManager *PeerManager
chDescs []*ChannelDescriptor
@@ -162,8 +161,9 @@ type Router struct {
peerMtx sync.RWMutex
peerQueues map[types.NodeID]queue // outbound messages per peer for all channels
// the channels that the peer queue has open
peerChannels map[types.NodeID]ChannelIDSet
queueFactory func(int) queue
peerChannels map[types.NodeID]ChannelIDSet
queueFactory func(int) queue
nodeInfoProducer func() *types.NodeInfo
// FIXME: We don't strictly need to use a mutex for this if we seal the
// channels on router start. This depends on whether we want to allow
@@ -179,9 +179,9 @@ type Router struct {
func NewRouter(
logger log.Logger,
metrics *Metrics,
nodeInfo types.NodeInfo,
privKey crypto.PrivKey,
peerManager *PeerManager,
nodeInfoProducer func() *types.NodeInfo,
transports []Transport,
endpoints []Endpoint,
options RouterOptions,
@@ -192,10 +192,10 @@ func NewRouter(
}
router := &Router{
logger: logger,
metrics: metrics,
nodeInfo: nodeInfo,
privKey: privKey,
logger: logger,
metrics: metrics,
privKey: privKey,
nodeInfoProducer: nodeInfoProducer,
connTracker: newConnTracker(
options.MaxIncomingConnectionAttempts,
options.IncomingConnectionWindow,
@@ -284,7 +284,7 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*C
r.channelMessages[id] = messageType
// add the channel to the nodeInfo if it's not already there.
r.nodeInfo.AddChannel(uint16(chDesc.ID))
r.nodeInfoProducer().AddChannel(uint16(chDesc.ID))
for _, t := range r.transports {
t.AddChannelDescriptors([]*ChannelDescriptor{chDesc})
@@ -715,7 +715,8 @@ func (r *Router) handshakePeer(
defer cancel()
}
peerInfo, peerKey, err := conn.Handshake(ctx, r.nodeInfo, r.privKey)
nodeInfo := r.nodeInfoProducer()
peerInfo, peerKey, err := conn.Handshake(ctx, *nodeInfo, r.privKey)
if err != nil {
return peerInfo, err
}
@@ -730,7 +731,7 @@ func (r *Router) handshakePeer(
return peerInfo, fmt.Errorf("expected to connect with peer %q, got %q",
expectID, peerInfo.NodeID)
}
if err := r.nodeInfo.CompatibleWith(peerInfo); err != nil {
if err := r.nodeInfoProducer().CompatibleWith(peerInfo); err != nil {
return peerInfo, ErrRejected{
err: err,
id: peerInfo.ID(),
@@ -930,7 +931,7 @@ func (r *Router) evictPeers(ctx context.Context) {
// NodeInfo returns a copy of the current NodeInfo. Used for testing.
func (r *Router) NodeInfo() types.NodeInfo {
return r.nodeInfo.Copy()
return r.nodeInfoProducer().Copy()
}
func (r *Router) setupQueueFactory(ctx context.Context) error {
@@ -957,11 +958,12 @@ func (r *Router) OnStart(ctx context.Context) error {
}
}
nodeInfo := r.nodeInfoProducer()
r.logger.Info(
"starting router",
"node_id", r.nodeInfo.NodeID,
"channels", r.nodeInfo.Channels,
"listen_addr", r.nodeInfo.ListenAddr,
"node_id", nodeInfo.NodeID,
"channels", nodeInfo.Channels,
"listen_addr", nodeInfo.ListenAddr,
"transports", len(r.transports),
)

View File

@@ -23,7 +23,7 @@ func TestRouter_ConstructQueueFactory(t *testing.T) {
t.Run("Default", func(t *testing.T) {
require.Zero(t, os.Getenv("TM_P2P_QUEUE"))
opts := RouterOptions{}
r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts)
require.NoError(t, err)
require.NoError(t, r.setupQueueFactory(ctx))
@@ -32,7 +32,7 @@ func TestRouter_ConstructQueueFactory(t *testing.T) {
})
t.Run("Fifo", func(t *testing.T) {
opts := RouterOptions{QueueType: queueTypeFifo}
r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts)
require.NoError(t, err)
require.NoError(t, r.setupQueueFactory(ctx))
@@ -41,7 +41,7 @@ func TestRouter_ConstructQueueFactory(t *testing.T) {
})
t.Run("Priority", func(t *testing.T) {
opts := RouterOptions{QueueType: queueTypePriority}
r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts)
require.NoError(t, err)
require.NoError(t, r.setupQueueFactory(ctx))
@@ -51,7 +51,7 @@ func TestRouter_ConstructQueueFactory(t *testing.T) {
})
t.Run("NonExistant", func(t *testing.T) {
opts := RouterOptions{QueueType: "fast"}
_, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
_, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts)
require.Error(t, err)
require.Contains(t, err.Error(), "fast")
})

View File

@@ -108,9 +108,9 @@ func TestRouter_Channel_Basic(t *testing.T) {
router, err := p2p.NewRouter(
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
nil,
nil,
p2p.RouterOptions{},
@@ -410,9 +410,9 @@ func TestRouter_AcceptPeers(t *testing.T) {
router, err := p2p.NewRouter(
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
@@ -464,9 +464,9 @@ func TestRouter_AcceptPeers_Error(t *testing.T) {
router, err := p2p.NewRouter(
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
@@ -501,9 +501,9 @@ func TestRouter_AcceptPeers_ErrorEOF(t *testing.T) {
router, err := p2p.NewRouter(
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
@@ -552,9 +552,9 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) {
router, err := p2p.NewRouter(
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
@@ -655,9 +655,9 @@ func TestRouter_DialPeers(t *testing.T) {
router, err := p2p.NewRouter(
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
@@ -740,9 +740,9 @@ func TestRouter_DialPeers_Parallel(t *testing.T) {
router, err := p2p.NewRouter(
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{
@@ -814,9 +814,9 @@ func TestRouter_EvictPeers(t *testing.T) {
router, err := p2p.NewRouter(
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
@@ -876,9 +876,9 @@ func TestRouter_ChannelCompatability(t *testing.T) {
router, err := p2p.NewRouter(
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
@@ -931,9 +931,9 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) {
router, err := p2p.NewRouter(
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},

View File

@@ -54,10 +54,10 @@ type nodeImpl struct {
privValidator types.PrivValidator // local node's validator key
// network
peerManager *p2p.PeerManager
router *p2p.Router
nodeInfo types.NodeInfo
nodeKey types.NodeKey // our node privkey
peerManager *p2p.PeerManager
router *p2p.Router
nodeInfoProducer func() *types.NodeInfo
nodeKey types.NodeKey // our node privkey
// services
eventSinks []indexer.EventSink
@@ -213,13 +213,6 @@ func makeNode(
}
}
// Determine whether we should attempt state sync.
stateSync := cfg.StateSync.Enable && !onlyValidatorIsUs(state, pubKey)
if stateSync && state.LastBlockHeight > 0 {
logger.Info("Found local state with non-zero height, skipping state sync")
stateSync = false
}
// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.
if err := consensus.NewHandshaker(
@@ -256,7 +249,7 @@ func makeNode(
makeCloser(closers))
}
router, err := createRouter(logger, nodeMetrics.p2p, nodeInfo, nodeKey, peerManager, cfg, proxyApp)
router, err := createRouter(logger, nodeMetrics.p2p, func() *types.NodeInfo { return &nodeInfo }, nodeKey, peerManager, cfg, proxyApp)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("failed to create router: %w", err),
@@ -288,6 +281,13 @@ func makeNode(
nodeMetrics.state,
)
// Determine whether we should attempt state sync.
stateSync := cfg.StateSync.Enable && !onlyValidatorIsUs(state, pubKey)
if stateSync && state.LastBlockHeight > 0 {
logger.Info("Found local state with non-zero height, skipping state sync")
stateSync = false
}
// Determine whether we should do block sync. This must happen after the handshake, since the
// app may modify the validator set, specifying ourself as the only validator.
blockSync := !onlyValidatorIsUs(state, pubKey)
@@ -355,10 +355,10 @@ func makeNode(
genesisDoc: genDoc,
privValidator: privValidator,
peerManager: peerManager,
router: router,
nodeInfo: nodeInfo,
nodeKey: nodeKey,
peerManager: peerManager,
router: router,
nodeInfoProducer: func() *types.NodeInfo { return &nodeInfo },
nodeKey: nodeKey,
eventSinks: eventSinks,
@@ -458,7 +458,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
return err
}
n.rpcEnv.NodeInfo = n.nodeInfo
n.rpcEnv.NodeInfo = n.nodeInfoProducer().Copy()
// Start the RPC server before the P2P server
// so we can eg. receive txs for the first block
if n.config.RPC.ListenAddress != "" {

View File

@@ -147,7 +147,7 @@ func TestNodeSetAppVersion(t *testing.T) {
assert.Equal(t, state.Version.Consensus.App, appVersion)
// check version is set in node info
assert.Equal(t, n.nodeInfo.ProtocolVersion.App, appVersion)
assert.Equal(t, n.nodeInfoProducer().ProtocolVersion.App, appVersion)
}
func TestNodeSetPrivValTCP(t *testing.T) {

View File

@@ -29,7 +29,6 @@ type seedNodeImpl struct {
// network
peerManager *p2p.PeerManager
router *p2p.Router
nodeInfo types.NodeInfo
nodeKey types.NodeKey // our node privkey
isListening bool
@@ -75,7 +74,7 @@ func makeSeedNode(
closer)
}
router, err := createRouter(logger, p2pMetrics, nodeInfo, nodeKey, peerManager, cfg, nil)
router, err := createRouter(logger, p2pMetrics, func() *types.NodeInfo { return &nodeInfo }, nodeKey, peerManager, cfg, nil)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("failed to create router: %w", err),
@@ -87,7 +86,6 @@ func makeSeedNode(
logger: logger,
genesisDoc: genDoc,
nodeInfo: nodeInfo,
nodeKey: nodeKey,
peerManager: peerManager,
router: router,

View File

@@ -363,7 +363,7 @@ func createPeerManager(
func createRouter(
logger log.Logger,
p2pMetrics *p2p.Metrics,
nodeInfo types.NodeInfo,
nodeInfoProducer func() *types.NodeInfo,
nodeKey types.NodeKey,
peerManager *p2p.PeerManager,
cfg *config.Config,
@@ -392,9 +392,9 @@ func createRouter(
return p2p.NewRouter(
p2pLogger,
p2pMetrics,
nodeInfo,
nodeKey.PrivKey,
peerManager,
nodeInfoProducer,
[]p2p.Transport{transport},
[]p2p.Endpoint{ep},
getRouterConfig(cfg, appClient),