diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go index 85df029d8..51d9adb85 100644 --- a/internal/p2p/p2ptest/network.go +++ b/internal/p2p/p2ptest/network.go @@ -265,10 +265,12 @@ func (n *Network) MakeNode(ctx context.Context, t *testing.T, opts NodeOptions) p2p.NopMetrics(), privKey, peerManager, - func() *types.NodeInfo { return &nodeInfo }, - transport, - ep, - p2p.RouterOptions{DialSleep: func(_ context.Context) {}}, + p2p.RouterOptions{ + NodeInfoProducer: func() *types.NodeInfo { return &nodeInfo }, + DialSleep: func(_ context.Context) {}, + LegacyTransport: transport, + LegacyEndpoint: ep, + }, ) require.NoError(t, err) diff --git a/internal/p2p/router.go b/internal/p2p/router.go index f5117d75e..3df5f6430 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -73,9 +73,16 @@ type RouterOptions struct { // runtime.NumCPU. NumConcurrentDials func() int + // NodeInfoProducer returns a reference to the current + // NodeInfo object for use in adding channels. + NodeInfoProducer func() *types.NodeInfo + // UseLibP2P toggles the use of the new networking layer // within the router. UseLibP2P bool + + LegacyTransport Transport + LegacyEndpoint *Endpoint } const ( @@ -94,6 +101,26 @@ func (o *RouterOptions) Validate() error { return fmt.Errorf("queue type %q is not supported", o.QueueType) } + if o.NodeInfoProducer == nil { + return errors.New("must specify a NodeInfoProducer") + } + + if o.UseLibP2P { + if o.LegacyTransport != nil { + return errors.New("when using libp2p you must not specify legacy components (transport)") + } + if o.LegacyEndpoint != nil { + return errors.New("when using libp2p you must not specify legacy components (endpoint)") + } + } else { + if o.LegacyTransport == nil { + return errors.New("when using legacy p2p you must specify a transport") + } + if o.LegacyEndpoint == nil { + return errors.New("when using legacy p2p you must specify an endpoint") + } + } + switch { case o.IncomingConnectionWindow == 0: o.IncomingConnectionWindow = 100 * time.Millisecond @@ -179,39 +206,29 @@ type Router struct { // NewRouter creates a new Router. The given Transports must already be // listening on appropriate interfaces, and will be closed by the Router when it // stops. -func NewRouter( - logger log.Logger, - metrics *Metrics, - privKey crypto.PrivKey, - peerManager *PeerManager, - nodeInfoProducer func() *types.NodeInfo, - transport Transport, - endpoint *Endpoint, - options RouterOptions, -) (*Router, error) { - - if err := options.Validate(); err != nil { +func NewRouter(logger log.Logger, metrics *Metrics, key crypto.PrivKey, pm *PeerManager, opts RouterOptions) (*Router, error) { + if err := opts.Validate(); err != nil { return nil, err } - if options.UseLibP2P { + if opts.UseLibP2P { return nil, errors.New("libp2p is not supported") } router := &Router{ logger: logger, metrics: metrics, - privKey: privKey, - nodeInfoProducer: nodeInfoProducer, + privKey: key, + nodeInfoProducer: opts.NodeInfoProducer, connTracker: newConnTracker( - options.MaxIncomingConnectionAttempts, - options.IncomingConnectionWindow, + opts.MaxIncomingConnectionAttempts, + opts.IncomingConnectionWindow, ), chDescs: make([]*ChannelDescriptor, 0), - transport: transport, - endpoint: endpoint, - peerManager: peerManager, - options: options, + transport: opts.LegacyTransport, + endpoint: opts.LegacyEndpoint, + peerManager: pm, + options: opts, channelQueues: map[ChannelID]queue{}, channelMessages: map[ChannelID]proto.Message{}, peerQueues: map[types.NodeID]queue{}, diff --git a/internal/p2p/router_init_test.go b/internal/p2p/router_init_test.go index 20c3cb6dc..20c8e4692 100644 --- a/internal/p2p/router_init_test.go +++ b/internal/p2p/router_init_test.go @@ -11,19 +11,26 @@ import ( "github.com/tendermint/tendermint/types" ) +func getDefaultRouterOptions() RouterOptions { + return RouterOptions{ + LegacyTransport: &MemoryTransport{}, + LegacyEndpoint: &Endpoint{}, + NodeInfoProducer: func() *types.NodeInfo { return &types.NodeInfo{} }, + } +} func TestRouter_ConstructQueueFactory(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() t.Run("ValidateOptionsPopulatesDefaultQueue", func(t *testing.T) { - opts := RouterOptions{} + opts := getDefaultRouterOptions() require.NoError(t, opts.Validate()) require.Equal(t, "fifo", opts.QueueType) }) t.Run("Default", func(t *testing.T) { require.Zero(t, os.Getenv("TM_P2P_QUEUE")) - opts := RouterOptions{} - r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts) + opts := getDefaultRouterOptions() + r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, opts) require.NoError(t, err) require.NoError(t, r.setupQueueFactory(ctx)) @@ -31,8 +38,9 @@ func TestRouter_ConstructQueueFactory(t *testing.T) { require.True(t, ok) }) t.Run("Fifo", func(t *testing.T) { - opts := RouterOptions{QueueType: queueTypeFifo} - r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts) + opts := getDefaultRouterOptions() + opts.QueueType = queueTypeFifo + r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, opts) require.NoError(t, err) require.NoError(t, r.setupQueueFactory(ctx)) @@ -40,8 +48,9 @@ func TestRouter_ConstructQueueFactory(t *testing.T) { require.True(t, ok) }) t.Run("Priority", func(t *testing.T) { - opts := RouterOptions{QueueType: queueTypePriority} - r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts) + opts := getDefaultRouterOptions() + opts.QueueType = queueTypePriority + r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, opts) require.NoError(t, err) require.NoError(t, r.setupQueueFactory(ctx)) @@ -50,8 +59,9 @@ func TestRouter_ConstructQueueFactory(t *testing.T) { defer q.close() }) t.Run("NonExistant", func(t *testing.T) { - opts := RouterOptions{QueueType: "fast"} - _, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts) + opts := getDefaultRouterOptions() + opts.QueueType = "fast" + _, err := NewRouter(log.NewNopLogger(), nil, nil, nil, opts) require.Error(t, err) require.Contains(t, err.Error(), "fast") }) diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index 6fe6c50a0..d76030d5e 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -27,31 +27,64 @@ import ( ) func TestRouterConstruction(t *testing.T) { - opts := p2p.RouterOptions{UseLibP2P: true} - if err := opts.Validate(); err != nil { - t.Fatalf("options should validate: %v", err) - } - logger := log.NewNopLogger() - metrics := p2p.NopMetrics() + t.Run("Legacy", func(t *testing.T) { + opts := p2p.RouterOptions{ + UseLibP2P: false, + LegacyEndpoint: &p2p.Endpoint{}, + LegacyTransport: &p2p.MemoryTransport{}, + NodeInfoProducer: func() *types.NodeInfo { return &types.NodeInfo{} }, + } + if err := opts.Validate(); err != nil { + t.Fatalf("options should validate: %v", err) + } - router, err := p2p.NewRouter( - logger, - metrics, - nil, // privkey - nil, // peermanager - func() *types.NodeInfo { return &types.NodeInfo{} }, - []p2p.Transport{}, - []p2p.Endpoint{}, - opts, - ) - if err == nil { - t.Error("support for libp2p does not exist, and should prevent the router from being constructed") - } else if err.Error() != "libp2p is not supported" { - t.Errorf("incorrect error: %q", err.Error()) - } - if router != nil { - t.Error("router was constructed when it should not have been") - } + logger := log.NewNopLogger() + metrics := p2p.NopMetrics() + + router, err := p2p.NewRouter( + logger, + metrics, + nil, // privkey + nil, // peermanager + opts, + ) + if err != nil { + t.Fatal("problem constructing legacy router", err) + } + if router == nil { + t.Error("router was not constructed when it should not have been") + } + }) + t.Run("LibP2P", func(t *testing.T) { + opts := p2p.RouterOptions{ + UseLibP2P: true, + LegacyEndpoint: nil, + LegacyTransport: nil, + NodeInfoProducer: func() *types.NodeInfo { return &types.NodeInfo{} }, + } + if err := opts.Validate(); err != nil { + t.Fatalf("options should validate: %v", err) + } + + logger := log.NewNopLogger() + metrics := p2p.NopMetrics() + + router, err := p2p.NewRouter( + logger, + metrics, + nil, // privkey + nil, // peermanager + opts, + ) + if err == nil { + t.Error("support for libp2p does not exist, and should prevent the router from being constructed") + } else if err.Error() != "libp2p is not supported" { + t.Errorf("incorrect error: %q", err.Error()) + } + if router != nil { + t.Error("router was constructed and should not have have been") + } + }) } func echoReactor(ctx context.Context, channel *p2p.Channel) { @@ -140,10 +173,11 @@ func TestRouter_Channel_Basic(t *testing.T) { p2p.NopMetrics(), selfKey, peerManager, - func() *types.NodeInfo { return &selfInfo }, - testnet.RandomNode().Transport, - &p2p.Endpoint{}, - p2p.RouterOptions{}, + p2p.RouterOptions{ + LegacyTransport: testnet.RandomNode().Transport, + LegacyEndpoint: &p2p.Endpoint{}, + NodeInfoProducer: func() *types.NodeInfo { return &selfInfo }, + }, ) require.NoError(t, err) @@ -439,10 +473,11 @@ func TestRouter_AcceptPeers(t *testing.T) { p2p.NopMetrics(), selfKey, peerManager, - func() *types.NodeInfo { return &selfInfo }, - mockTransport, - nil, - p2p.RouterOptions{}, + p2p.RouterOptions{ + LegacyTransport: mockTransport, + LegacyEndpoint: &p2p.Endpoint{}, + NodeInfoProducer: func() *types.NodeInfo { return &selfInfo }, + }, ) require.NoError(t, err) require.NoError(t, router.Start(ctx)) @@ -493,10 +528,11 @@ func TestRouter_AcceptPeers_Error(t *testing.T) { p2p.NopMetrics(), selfKey, peerManager, - func() *types.NodeInfo { return &selfInfo }, - mockTransport, - nil, - p2p.RouterOptions{}, + p2p.RouterOptions{ + LegacyTransport: mockTransport, + LegacyEndpoint: &p2p.Endpoint{}, + NodeInfoProducer: func() *types.NodeInfo { return &selfInfo }, + }, ) require.NoError(t, err) @@ -530,10 +566,11 @@ func TestRouter_AcceptPeers_ErrorEOF(t *testing.T) { p2p.NopMetrics(), selfKey, peerManager, - func() *types.NodeInfo { return &selfInfo }, - mockTransport, - nil, - p2p.RouterOptions{}, + p2p.RouterOptions{ + LegacyTransport: mockTransport, + LegacyEndpoint: &p2p.Endpoint{}, + NodeInfoProducer: func() *types.NodeInfo { return &selfInfo }, + }, ) require.NoError(t, err) @@ -581,10 +618,11 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) { p2p.NopMetrics(), selfKey, peerManager, - func() *types.NodeInfo { return &selfInfo }, - mockTransport, - nil, - p2p.RouterOptions{}, + p2p.RouterOptions{ + LegacyTransport: mockTransport, + LegacyEndpoint: &p2p.Endpoint{}, + NodeInfoProducer: func() *types.NodeInfo { return &selfInfo }, + }, ) require.NoError(t, err) require.NoError(t, router.Start(ctx)) @@ -684,10 +722,11 @@ func TestRouter_DialPeers(t *testing.T) { p2p.NopMetrics(), selfKey, peerManager, - func() *types.NodeInfo { return &selfInfo }, - mockTransport, - nil, - p2p.RouterOptions{}, + p2p.RouterOptions{ + NodeInfoProducer: func() *types.NodeInfo { return &selfInfo }, + LegacyTransport: mockTransport, + LegacyEndpoint: &p2p.Endpoint{}, + }, ) require.NoError(t, err) require.NoError(t, router.Start(ctx)) @@ -769,11 +808,11 @@ func TestRouter_DialPeers_Parallel(t *testing.T) { p2p.NopMetrics(), selfKey, peerManager, - func() *types.NodeInfo { return &selfInfo }, - mockTransport, - nil, p2p.RouterOptions{ - DialSleep: func(_ context.Context) {}, + NodeInfoProducer: func() *types.NodeInfo { return &selfInfo }, + LegacyTransport: mockTransport, + LegacyEndpoint: &p2p.Endpoint{}, + DialSleep: func(_ context.Context) {}, NumConcurrentDials: func() int { ncpu := runtime.NumCPU() if ncpu <= 3 { @@ -843,10 +882,11 @@ func TestRouter_EvictPeers(t *testing.T) { p2p.NopMetrics(), selfKey, peerManager, - func() *types.NodeInfo { return &selfInfo }, - mockTransport, - nil, - p2p.RouterOptions{}, + p2p.RouterOptions{ + NodeInfoProducer: func() *types.NodeInfo { return &selfInfo }, + LegacyTransport: mockTransport, + LegacyEndpoint: &p2p.Endpoint{}, + }, ) require.NoError(t, err) require.NoError(t, router.Start(ctx)) @@ -905,10 +945,11 @@ func TestRouter_ChannelCompatability(t *testing.T) { p2p.NopMetrics(), selfKey, peerManager, - func() *types.NodeInfo { return &selfInfo }, - mockTransport, - nil, - p2p.RouterOptions{}, + p2p.RouterOptions{ + NodeInfoProducer: func() *types.NodeInfo { return &selfInfo }, + LegacyTransport: mockTransport, + LegacyEndpoint: &p2p.Endpoint{}, + }, ) require.NoError(t, err) require.NoError(t, router.Start(ctx)) @@ -960,10 +1001,11 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) { p2p.NopMetrics(), selfKey, peerManager, - func() *types.NodeInfo { return &selfInfo }, - mockTransport, - nil, - p2p.RouterOptions{}, + p2p.RouterOptions{ + NodeInfoProducer: func() *types.NodeInfo { return &selfInfo }, + LegacyTransport: mockTransport, + LegacyEndpoint: &p2p.Endpoint{}, + }, ) require.NoError(t, err) require.NoError(t, router.Start(ctx)) diff --git a/node/setup.go b/node/setup.go index d6966800a..7d8bb3165 100644 --- a/node/setup.go +++ b/node/setup.go @@ -304,15 +304,16 @@ func createRouter( return nil, err } + opts := getRouterConfig(cfg, appClient) + opts.LegacyEndpoint = ep + opts.LegacyTransport = transport + opts.NodeInfoProducer = nodeInfoProducer return p2p.NewRouter( p2pLogger, p2pMetrics, nodeKey.PrivKey, peerManager, - nodeInfoProducer, - transport, - ep, - getRouterConfig(cfg, appClient), + opts, ) }