p2p: accept should not abort on first error (#8759)

This commit is contained in:
Sam Kleinman
2022-06-14 19:12:53 -04:00
committed by GitHub
parent bf1cb89bb7
commit 979a6a1b13
2 changed files with 43 additions and 70 deletions

View File

@@ -470,14 +470,17 @@ func (r *Router) dialSleep(ctx context.Context) {
func (r *Router) acceptPeers(ctx context.Context, transport Transport) {
for {
conn, err := transport.Accept(ctx)
switch err {
case nil:
case io.EOF:
r.logger.Debug("stopping accept routine", "transport", transport)
switch {
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
r.logger.Debug("stopping accept routine", "transport", transport, "err", "context canceled")
return
default:
case errors.Is(err, io.EOF):
r.logger.Debug("stopping accept routine", "transport", transport, "err", "EOF")
return
case err != nil:
// in this case we got an error from the net.Listener.
r.logger.Error("failed to accept connection", "transport", transport, "err", err)
return
continue
}
incomingIP := conn.RemoteEndpoint().IP
@@ -489,7 +492,7 @@ func (r *Router) acceptPeers(ctx context.Context, transport Transport) {
"close_err", closeErr,
)
return
continue
}
// Spawn a goroutine for the handshake, to avoid head-of-line blocking.

View File

@@ -442,78 +442,48 @@ func TestRouter_AcceptPeers(t *testing.T) {
}
}
func TestRouter_AcceptPeers_Error(t *testing.T) {
t.Cleanup(leaktest.Check(t))
func TestRouter_AcceptPeers_Errors(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for _, err := range []error{io.EOF, context.Canceled, context.DeadlineExceeded} {
t.Run(err.Error(), func(t *testing.T) {
t.Cleanup(leaktest.Check(t))
// Set up a mock transport that returns an error, which should prevent
// the router from calling Accept again.
mockTransport := &mocks.Transport{}
mockTransport.On("String").Maybe().Return("mock")
mockTransport.On("Accept", mock.Anything).Once().Return(nil, errors.New("boom"))
mockTransport.On("Close").Return(nil)
mockTransport.On("Listen", mock.Anything).Return(nil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Set up and start the router.
peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err)
// Set up a mock transport that returns io.EOF once, which should prevent
// the router from calling Accept again.
mockTransport := &mocks.Transport{}
mockTransport.On("String").Maybe().Return("mock")
mockTransport.On("Accept", mock.Anything).Once().Return(nil, io.EOF)
mockTransport.On("Close").Return(nil)
mockTransport.On("Listen", mock.Anything).Return(nil)
router, err := p2p.NewRouter(
log.NewNopLogger(),
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
mockTransport,
nil,
p2p.RouterOptions{},
)
require.NoError(t, err)
// Set up and start the router.
peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err)
require.NoError(t, router.Start(ctx))
time.Sleep(time.Second)
router.Stop()
router, err := p2p.NewRouter(
log.NewNopLogger(),
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
mockTransport,
nil,
p2p.RouterOptions{},
)
require.NoError(t, err)
mockTransport.AssertExpectations(t)
}
require.NoError(t, router.Start(ctx))
time.Sleep(time.Second)
router.Stop()
func TestRouter_AcceptPeers_ErrorEOF(t *testing.T) {
t.Cleanup(leaktest.Check(t))
mockTransport.AssertExpectations(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
})
// Set up a mock transport that returns io.EOF once, which should prevent
// the router from calling Accept again.
mockTransport := &mocks.Transport{}
mockTransport.On("String").Maybe().Return("mock")
mockTransport.On("Accept", mock.Anything).Once().Return(nil, io.EOF)
mockTransport.On("Close").Return(nil)
mockTransport.On("Listen", mock.Anything).Return(nil)
// Set up and start the router.
peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err)
router, err := p2p.NewRouter(
log.NewNopLogger(),
p2p.NopMetrics(),
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
mockTransport,
nil,
p2p.RouterOptions{},
)
require.NoError(t, err)
require.NoError(t, router.Start(ctx))
time.Sleep(time.Second)
router.Stop()
mockTransport.AssertExpectations(t)
}
}
func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) {