From 979a6a1b13331ae90efb1c03b9dc69bc227d1909 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Tue, 14 Jun 2022 19:12:53 -0400 Subject: [PATCH] p2p: accept should not abort on first error (#8759) --- internal/p2p/router.go | 17 ++++--- internal/p2p/router_test.go | 96 +++++++++++++------------------------ 2 files changed, 43 insertions(+), 70 deletions(-) diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 8b77541de..511fa0fb9 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -470,14 +470,17 @@ func (r *Router) dialSleep(ctx context.Context) { func (r *Router) acceptPeers(ctx context.Context, transport Transport) { for { conn, err := transport.Accept(ctx) - switch err { - case nil: - case io.EOF: - r.logger.Debug("stopping accept routine", "transport", transport) + switch { + case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded): + r.logger.Debug("stopping accept routine", "transport", transport, "err", "context canceled") return - default: + case errors.Is(err, io.EOF): + r.logger.Debug("stopping accept routine", "transport", transport, "err", "EOF") + return + case err != nil: + // in this case we got an error from the net.Listener. r.logger.Error("failed to accept connection", "transport", transport, "err", err) - return + continue } incomingIP := conn.RemoteEndpoint().IP @@ -489,7 +492,7 @@ func (r *Router) acceptPeers(ctx context.Context, transport Transport) { "close_err", closeErr, ) - return + continue } // Spawn a goroutine for the handshake, to avoid head-of-line blocking. diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index 663e6b81c..e0910fb21 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -442,78 +442,48 @@ func TestRouter_AcceptPeers(t *testing.T) { } } -func TestRouter_AcceptPeers_Error(t *testing.T) { - t.Cleanup(leaktest.Check(t)) +func TestRouter_AcceptPeers_Errors(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + for _, err := range []error{io.EOF, context.Canceled, context.DeadlineExceeded} { + t.Run(err.Error(), func(t *testing.T) { + t.Cleanup(leaktest.Check(t)) - // Set up a mock transport that returns an error, which should prevent - // the router from calling Accept again. - mockTransport := &mocks.Transport{} - mockTransport.On("String").Maybe().Return("mock") - mockTransport.On("Accept", mock.Anything).Once().Return(nil, errors.New("boom")) - mockTransport.On("Close").Return(nil) - mockTransport.On("Listen", mock.Anything).Return(nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - // Set up and start the router. - peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) - require.NoError(t, err) + // Set up a mock transport that returns io.EOF once, which should prevent + // the router from calling Accept again. + mockTransport := &mocks.Transport{} + mockTransport.On("String").Maybe().Return("mock") + mockTransport.On("Accept", mock.Anything).Once().Return(nil, io.EOF) + mockTransport.On("Close").Return(nil) + mockTransport.On("Listen", mock.Anything).Return(nil) - router, err := p2p.NewRouter( - log.NewNopLogger(), - p2p.NopMetrics(), - selfKey, - peerManager, - func() *types.NodeInfo { return &selfInfo }, - mockTransport, - nil, - p2p.RouterOptions{}, - ) - require.NoError(t, err) + // Set up and start the router. + peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + require.NoError(t, err) - require.NoError(t, router.Start(ctx)) - time.Sleep(time.Second) - router.Stop() + router, err := p2p.NewRouter( + log.NewNopLogger(), + p2p.NopMetrics(), + selfKey, + peerManager, + func() *types.NodeInfo { return &selfInfo }, + mockTransport, + nil, + p2p.RouterOptions{}, + ) + require.NoError(t, err) - mockTransport.AssertExpectations(t) -} + require.NoError(t, router.Start(ctx)) + time.Sleep(time.Second) + router.Stop() -func TestRouter_AcceptPeers_ErrorEOF(t *testing.T) { - t.Cleanup(leaktest.Check(t)) + mockTransport.AssertExpectations(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + }) - // Set up a mock transport that returns io.EOF once, which should prevent - // the router from calling Accept again. - mockTransport := &mocks.Transport{} - mockTransport.On("String").Maybe().Return("mock") - mockTransport.On("Accept", mock.Anything).Once().Return(nil, io.EOF) - mockTransport.On("Close").Return(nil) - mockTransport.On("Listen", mock.Anything).Return(nil) - - // Set up and start the router. - peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) - require.NoError(t, err) - - router, err := p2p.NewRouter( - log.NewNopLogger(), - p2p.NopMetrics(), - selfKey, - peerManager, - func() *types.NodeInfo { return &selfInfo }, - mockTransport, - nil, - p2p.RouterOptions{}, - ) - require.NoError(t, err) - - require.NoError(t, router.Start(ctx)) - time.Sleep(time.Second) - router.Stop() - - mockTransport.AssertExpectations(t) + } } func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) {