p2p: move transport options into struct (#8432)

This commit is contained in:
Sam Kleinman
2022-04-28 17:51:00 -04:00
committed by GitHub
parent 72405b450f
commit 4844af2b8d
5 changed files with 174 additions and 102 deletions

View File

@@ -265,10 +265,12 @@ func (n *Network) MakeNode(ctx context.Context, t *testing.T, opts NodeOptions)
p2p.NopMetrics(),
privKey,
peerManager,
func() *types.NodeInfo { return &nodeInfo },
transport,
ep,
p2p.RouterOptions{DialSleep: func(_ context.Context) {}},
p2p.RouterOptions{
NodeInfoProducer: func() *types.NodeInfo { return &nodeInfo },
DialSleep: func(_ context.Context) {},
LegacyTransport: transport,
LegacyEndpoint: ep,
},
)
require.NoError(t, err)

View File

@@ -73,9 +73,16 @@ type RouterOptions struct {
// runtime.NumCPU.
NumConcurrentDials func() int
// NodeInfoProducer returns a reference to the current
// NodeInfo object for use in adding channels.
NodeInfoProducer func() *types.NodeInfo
// UseLibP2P toggles the use of the new networking layer
// within the router.
UseLibP2P bool
LegacyTransport Transport
LegacyEndpoint *Endpoint
}
const (
@@ -94,6 +101,26 @@ func (o *RouterOptions) Validate() error {
return fmt.Errorf("queue type %q is not supported", o.QueueType)
}
if o.NodeInfoProducer == nil {
return errors.New("must specify a NodeInfoProducer")
}
if o.UseLibP2P {
if o.LegacyTransport != nil {
return errors.New("when using libp2p you must not specify legacy components (transport)")
}
if o.LegacyEndpoint != nil {
return errors.New("when using libp2p you must not specify legacy components (endpoint)")
}
} else {
if o.LegacyTransport == nil {
return errors.New("when using legacy p2p you must specify a transport")
}
if o.LegacyEndpoint == nil {
return errors.New("when using legacy p2p you must specify an endpoint")
}
}
switch {
case o.IncomingConnectionWindow == 0:
o.IncomingConnectionWindow = 100 * time.Millisecond
@@ -179,39 +206,29 @@ type Router struct {
// NewRouter creates a new Router. The given Transports must already be
// listening on appropriate interfaces, and will be closed by the Router when it
// stops.
func NewRouter(
logger log.Logger,
metrics *Metrics,
privKey crypto.PrivKey,
peerManager *PeerManager,
nodeInfoProducer func() *types.NodeInfo,
transport Transport,
endpoint *Endpoint,
options RouterOptions,
) (*Router, error) {
if err := options.Validate(); err != nil {
func NewRouter(logger log.Logger, metrics *Metrics, key crypto.PrivKey, pm *PeerManager, opts RouterOptions) (*Router, error) {
if err := opts.Validate(); err != nil {
return nil, err
}
if options.UseLibP2P {
if opts.UseLibP2P {
return nil, errors.New("libp2p is not supported")
}
router := &Router{
logger: logger,
metrics: metrics,
privKey: privKey,
nodeInfoProducer: nodeInfoProducer,
privKey: key,
nodeInfoProducer: opts.NodeInfoProducer,
connTracker: newConnTracker(
options.MaxIncomingConnectionAttempts,
options.IncomingConnectionWindow,
opts.MaxIncomingConnectionAttempts,
opts.IncomingConnectionWindow,
),
chDescs: make([]*ChannelDescriptor, 0),
transport: transport,
endpoint: endpoint,
peerManager: peerManager,
options: options,
transport: opts.LegacyTransport,
endpoint: opts.LegacyEndpoint,
peerManager: pm,
options: opts,
channelQueues: map[ChannelID]queue{},
channelMessages: map[ChannelID]proto.Message{},
peerQueues: map[types.NodeID]queue{},

View File

@@ -11,19 +11,26 @@ import (
"github.com/tendermint/tendermint/types"
)
func getDefaultRouterOptions() RouterOptions {
return RouterOptions{
LegacyTransport: &MemoryTransport{},
LegacyEndpoint: &Endpoint{},
NodeInfoProducer: func() *types.NodeInfo { return &types.NodeInfo{} },
}
}
func TestRouter_ConstructQueueFactory(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Run("ValidateOptionsPopulatesDefaultQueue", func(t *testing.T) {
opts := RouterOptions{}
opts := getDefaultRouterOptions()
require.NoError(t, opts.Validate())
require.Equal(t, "fifo", opts.QueueType)
})
t.Run("Default", func(t *testing.T) {
require.Zero(t, os.Getenv("TM_P2P_QUEUE"))
opts := RouterOptions{}
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts)
opts := getDefaultRouterOptions()
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, opts)
require.NoError(t, err)
require.NoError(t, r.setupQueueFactory(ctx))
@@ -31,8 +38,9 @@ func TestRouter_ConstructQueueFactory(t *testing.T) {
require.True(t, ok)
})
t.Run("Fifo", func(t *testing.T) {
opts := RouterOptions{QueueType: queueTypeFifo}
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts)
opts := getDefaultRouterOptions()
opts.QueueType = queueTypeFifo
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, opts)
require.NoError(t, err)
require.NoError(t, r.setupQueueFactory(ctx))
@@ -40,8 +48,9 @@ func TestRouter_ConstructQueueFactory(t *testing.T) {
require.True(t, ok)
})
t.Run("Priority", func(t *testing.T) {
opts := RouterOptions{QueueType: queueTypePriority}
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts)
opts := getDefaultRouterOptions()
opts.QueueType = queueTypePriority
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, opts)
require.NoError(t, err)
require.NoError(t, r.setupQueueFactory(ctx))
@@ -50,8 +59,9 @@ func TestRouter_ConstructQueueFactory(t *testing.T) {
defer q.close()
})
t.Run("NonExistant", func(t *testing.T) {
opts := RouterOptions{QueueType: "fast"}
_, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts)
opts := getDefaultRouterOptions()
opts.QueueType = "fast"
_, err := NewRouter(log.NewNopLogger(), nil, nil, nil, opts)
require.Error(t, err)
require.Contains(t, err.Error(), "fast")
})

View File

@@ -27,31 +27,64 @@ import (
)
func TestRouterConstruction(t *testing.T) {
opts := p2p.RouterOptions{UseLibP2P: true}
if err := opts.Validate(); err != nil {
t.Fatalf("options should validate: %v", err)
}
logger := log.NewNopLogger()
metrics := p2p.NopMetrics()
t.Run("Legacy", func(t *testing.T) {
opts := p2p.RouterOptions{
UseLibP2P: false,
LegacyEndpoint: &p2p.Endpoint{},
LegacyTransport: &p2p.MemoryTransport{},
NodeInfoProducer: func() *types.NodeInfo { return &types.NodeInfo{} },
}
if err := opts.Validate(); err != nil {
t.Fatalf("options should validate: %v", err)
}
router, err := p2p.NewRouter(
logger,
metrics,
nil, // privkey
nil, // peermanager
func() *types.NodeInfo { return &types.NodeInfo{} },
[]p2p.Transport{},
[]p2p.Endpoint{},
opts,
)
if err == nil {
t.Error("support for libp2p does not exist, and should prevent the router from being constructed")
} else if err.Error() != "libp2p is not supported" {
t.Errorf("incorrect error: %q", err.Error())
}
if router != nil {
t.Error("router was constructed when it should not have been")
}
logger := log.NewNopLogger()
metrics := p2p.NopMetrics()
router, err := p2p.NewRouter(
logger,
metrics,
nil, // privkey
nil, // peermanager
opts,
)
if err != nil {
t.Fatal("problem constructing legacy router", err)
}
if router == nil {
t.Error("router was not constructed when it should not have been")
}
})
t.Run("LibP2P", func(t *testing.T) {
opts := p2p.RouterOptions{
UseLibP2P: true,
LegacyEndpoint: nil,
LegacyTransport: nil,
NodeInfoProducer: func() *types.NodeInfo { return &types.NodeInfo{} },
}
if err := opts.Validate(); err != nil {
t.Fatalf("options should validate: %v", err)
}
logger := log.NewNopLogger()
metrics := p2p.NopMetrics()
router, err := p2p.NewRouter(
logger,
metrics,
nil, // privkey
nil, // peermanager
opts,
)
if err == nil {
t.Error("support for libp2p does not exist, and should prevent the router from being constructed")
} else if err.Error() != "libp2p is not supported" {
t.Errorf("incorrect error: %q", err.Error())
}
if router != nil {
t.Error("router was constructed and should not have have been")
}
})
}
func echoReactor(ctx context.Context, channel *p2p.Channel) {
@@ -140,10 +173,11 @@ func TestRouter_Channel_Basic(t *testing.T) {
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
testnet.RandomNode().Transport,
&p2p.Endpoint{},
p2p.RouterOptions{},
p2p.RouterOptions{
LegacyTransport: testnet.RandomNode().Transport,
LegacyEndpoint: &p2p.Endpoint{},
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
},
)
require.NoError(t, err)
@@ -439,10 +473,11 @@ func TestRouter_AcceptPeers(t *testing.T) {
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
mockTransport,
nil,
p2p.RouterOptions{},
p2p.RouterOptions{
LegacyTransport: mockTransport,
LegacyEndpoint: &p2p.Endpoint{},
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
},
)
require.NoError(t, err)
require.NoError(t, router.Start(ctx))
@@ -493,10 +528,11 @@ func TestRouter_AcceptPeers_Error(t *testing.T) {
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
mockTransport,
nil,
p2p.RouterOptions{},
p2p.RouterOptions{
LegacyTransport: mockTransport,
LegacyEndpoint: &p2p.Endpoint{},
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
},
)
require.NoError(t, err)
@@ -530,10 +566,11 @@ func TestRouter_AcceptPeers_ErrorEOF(t *testing.T) {
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
mockTransport,
nil,
p2p.RouterOptions{},
p2p.RouterOptions{
LegacyTransport: mockTransport,
LegacyEndpoint: &p2p.Endpoint{},
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
},
)
require.NoError(t, err)
@@ -581,10 +618,11 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) {
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
mockTransport,
nil,
p2p.RouterOptions{},
p2p.RouterOptions{
LegacyTransport: mockTransport,
LegacyEndpoint: &p2p.Endpoint{},
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
},
)
require.NoError(t, err)
require.NoError(t, router.Start(ctx))
@@ -684,10 +722,11 @@ func TestRouter_DialPeers(t *testing.T) {
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
mockTransport,
nil,
p2p.RouterOptions{},
p2p.RouterOptions{
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
LegacyTransport: mockTransport,
LegacyEndpoint: &p2p.Endpoint{},
},
)
require.NoError(t, err)
require.NoError(t, router.Start(ctx))
@@ -769,11 +808,11 @@ func TestRouter_DialPeers_Parallel(t *testing.T) {
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
mockTransport,
nil,
p2p.RouterOptions{
DialSleep: func(_ context.Context) {},
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
LegacyTransport: mockTransport,
LegacyEndpoint: &p2p.Endpoint{},
DialSleep: func(_ context.Context) {},
NumConcurrentDials: func() int {
ncpu := runtime.NumCPU()
if ncpu <= 3 {
@@ -843,10 +882,11 @@ func TestRouter_EvictPeers(t *testing.T) {
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
mockTransport,
nil,
p2p.RouterOptions{},
p2p.RouterOptions{
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
LegacyTransport: mockTransport,
LegacyEndpoint: &p2p.Endpoint{},
},
)
require.NoError(t, err)
require.NoError(t, router.Start(ctx))
@@ -905,10 +945,11 @@ func TestRouter_ChannelCompatability(t *testing.T) {
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
mockTransport,
nil,
p2p.RouterOptions{},
p2p.RouterOptions{
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
LegacyTransport: mockTransport,
LegacyEndpoint: &p2p.Endpoint{},
},
)
require.NoError(t, err)
require.NoError(t, router.Start(ctx))
@@ -960,10 +1001,11 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) {
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
mockTransport,
nil,
p2p.RouterOptions{},
p2p.RouterOptions{
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
LegacyTransport: mockTransport,
LegacyEndpoint: &p2p.Endpoint{},
},
)
require.NoError(t, err)
require.NoError(t, router.Start(ctx))

View File

@@ -304,15 +304,16 @@ func createRouter(
return nil, err
}
opts := getRouterConfig(cfg, appClient)
opts.LegacyEndpoint = ep
opts.LegacyTransport = transport
opts.NodeInfoProducer = nodeInfoProducer
return p2p.NewRouter(
p2pLogger,
p2pMetrics,
nodeKey.PrivKey,
peerManager,
nodeInfoProducer,
transport,
ep,
getRouterConfig(cfg, appClient),
opts,
)
}