From 417166704a0b2417cd947a86300c66d70e98ee30 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Tue, 18 Jan 2022 12:01:04 -0800 Subject: [PATCH 1/4] pex: do not send nil envelopes to the reactor (#7622) --- internal/p2p/pex/reactor.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index 87609cc85..2faa3130d 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -171,10 +171,14 @@ func (r *Reactor) processPexCh(ctx context.Context) { defer close(incoming) iter := r.pexCh.Receive(ctx) for iter.Next(ctx) { + env := iter.Envelope() + if env == nil { + break + } select { case <-ctx.Done(): return - case incoming <- iter.Envelope(): + case incoming <- env: } } }() From 5cca45bb457b5b4df1dbae3af46b44301dae2dbe Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Tue, 18 Jan 2022 14:32:22 -0800 Subject: [PATCH 2/4] pex: improve handling of closed channels (#7623) Reverts and improves on #7622. The problem turns out not to be on the PEX channel side, but on the pass-through (Go) channel. --- internal/p2p/pex/reactor.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index 2faa3130d..0c256a4f3 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -171,14 +171,10 @@ func (r *Reactor) processPexCh(ctx context.Context) { defer close(incoming) iter := r.pexCh.Receive(ctx) for iter.Next(ctx) { - env := iter.Envelope() - if env == nil { - break - } select { case <-ctx.Done(): return - case incoming <- env: + case incoming <- iter.Envelope(): } } }() @@ -198,7 +194,10 @@ func (r *Reactor) processPexCh(ctx context.Context) { } // inbound requests for new peers or responses to requests sent by this // reactor - case envelope := <-incoming: + case envelope, ok := <-incoming: + if !ok { + return + } duration, err = r.handleMessage(ctx, r.pexCh.ID, envelope) if err != nil { r.logger.Error("failed to process message", "ch_id", r.pexCh.ID, "envelope", envelope, "err", err) From a7eb95065d0badc1520cb64d10292392fa6f95ed Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Tue, 18 Jan 2022 14:57:20 -0800 Subject: [PATCH 3/4] autofile: ensure files are not reopened after closing (#7628) During file rotation and WAL shutdown, there was a race condition between users of an autofile and its termination. To fix this, ensure operations on an autofile are properly synchronized, and report errors when attempting to use an autofile after it was closed. Notably: - Simplify the cancellation protocol between signal and Close. - Exclude writers to an autofile during rotation. - Add documentation about what is going on. There is a lot more that could be improved here, but this addresses the more obvious races that have been panicking unit tests. --- internal/libs/autofile/autofile.go | 103 ++++++++++++++---------- internal/libs/autofile/autofile_test.go | 2 +- internal/libs/autofile/group.go | 21 ++--- 3 files changed, 74 insertions(+), 52 deletions(-) diff --git a/internal/libs/autofile/autofile.go b/internal/libs/autofile/autofile.go index 0bc9a63a3..6f38fc43b 100644 --- a/internal/libs/autofile/autofile.go +++ b/internal/libs/autofile/autofile.go @@ -2,6 +2,8 @@ package autofile import ( "context" + "errors" + "fmt" "os" "os/signal" "path/filepath" @@ -39,6 +41,10 @@ const ( autoFilePerms = os.FileMode(0600) ) +// errAutoFileClosed is reported when operations attempt to use an autofile +// after it has been closed. +var errAutoFileClosed = errors.New("autofile is closed") + // AutoFile automatically closes and re-opens file for writing. The file is // automatically setup to close itself every 1s and upon receiving SIGHUP. // @@ -47,12 +53,12 @@ type AutoFile struct { ID string Path string - closeTicker *time.Ticker - closeTickerStopc chan struct{} // closed when closeTicker is stopped - hupc chan os.Signal + closeTicker *time.Ticker // signals periodic close + cancel func() // cancels the lifecycle context - mtx sync.Mutex - file *os.File + mtx sync.Mutex // guards the fields below + closed bool // true when the the autofile is no longer usable + file *os.File // the underlying file (may be nil) } // OpenAutoFile creates an AutoFile in the path (with random ID). If there is @@ -64,24 +70,28 @@ func OpenAutoFile(ctx context.Context, path string) (*AutoFile, error) { if err != nil { return nil, err } + + ctx, cancel := context.WithCancel(ctx) af := &AutoFile{ - ID: tmrand.Str(12) + ":" + path, - Path: path, - closeTicker: time.NewTicker(autoFileClosePeriod), - closeTickerStopc: make(chan struct{}), + ID: tmrand.Str(12) + ":" + path, + Path: path, + closeTicker: time.NewTicker(autoFileClosePeriod), + cancel: cancel, } if err := af.openFile(); err != nil { af.Close() return nil, err } - // Close file on SIGHUP. - af.hupc = make(chan os.Signal, 1) - signal.Notify(af.hupc, syscall.SIGHUP) + // Set up a SIGHUP handler to forcibly flush and close the filehandle. + // This forces the next operation to re-open the underlying path. + hupc := make(chan os.Signal, 1) + signal.Notify(hupc, syscall.SIGHUP) go func() { + defer close(hupc) for { select { - case <-af.hupc: + case <-hupc: _ = af.closeFile() case <-ctx.Done(): return @@ -94,42 +104,47 @@ func OpenAutoFile(ctx context.Context, path string) (*AutoFile, error) { return af, nil } -// Close shuts down the closing goroutine, SIGHUP handler and closes the -// AutoFile. +// Close shuts down the service goroutine and marks af as invalid. Operations +// on af after Close will report an error. func (af *AutoFile) Close() error { - af.closeTicker.Stop() - close(af.closeTickerStopc) - if af.hupc != nil { - close(af.hupc) - } - return af.closeFile() + return af.withLock(func() error { + af.cancel() // signal the close service to stop + af.closed = true // mark the file as invalid + return af.unsyncCloseFile() + }) } func (af *AutoFile) closeFileRoutine(ctx context.Context) { for { select { case <-ctx.Done(): - _ = af.closeFile() + _ = af.Close() return case <-af.closeTicker.C: _ = af.closeFile() - case <-af.closeTickerStopc: - return } } } func (af *AutoFile) closeFile() (err error) { + return af.withLock(af.unsyncCloseFile) +} + +// unsyncCloseFile closes the underlying filehandle if one is open, and reports +// any error it returns. The caller must hold af.mtx exclusively. +func (af *AutoFile) unsyncCloseFile() error { + if fp := af.file; fp != nil { + af.file = nil + return fp.Close() + } + return nil +} + +// withLock runs f while holding af.mtx, and reports any error it returns. +func (af *AutoFile) withLock(f func() error) error { af.mtx.Lock() defer af.mtx.Unlock() - - file := af.file - if file == nil { - return nil - } - - af.file = nil - return file.Close() + return f() } // Write writes len(b) bytes to the AutoFile. It returns the number of bytes @@ -139,6 +154,9 @@ func (af *AutoFile) closeFile() (err error) { func (af *AutoFile) Write(b []byte) (n int, err error) { af.mtx.Lock() defer af.mtx.Unlock() + if af.closed { + return 0, fmt.Errorf("write: %w", errAutoFileClosed) + } if af.file == nil { if err = af.openFile(); err != nil { @@ -153,19 +171,19 @@ func (af *AutoFile) Write(b []byte) (n int, err error) { // Sync commits the current contents of the file to stable storage. Typically, // this means flushing the file system's in-memory copy of recently written // data to disk. -// Opens AutoFile if needed. func (af *AutoFile) Sync() error { - af.mtx.Lock() - defer af.mtx.Unlock() - - if af.file == nil { - if err := af.openFile(); err != nil { - return err + return af.withLock(func() error { + if af.closed { + return fmt.Errorf("sync: %w", errAutoFileClosed) + } else if af.file == nil { + return nil // nothing to sync } - } - return af.file.Sync() + return af.file.Sync() + }) } +// openFile unconditionally replaces af.file with a new filehandle on the path. +// The caller must hold af.mtx exclusively. func (af *AutoFile) openFile() error { file, err := os.OpenFile(af.Path, os.O_RDWR|os.O_CREATE|os.O_APPEND, autoFilePerms) if err != nil { @@ -188,6 +206,9 @@ func (af *AutoFile) openFile() error { func (af *AutoFile) Size() (int64, error) { af.mtx.Lock() defer af.mtx.Unlock() + if af.closed { + return 0, fmt.Errorf("size: %w", errAutoFileClosed) + } if af.file == nil { if err := af.openFile(); err != nil { diff --git a/internal/libs/autofile/autofile_test.go b/internal/libs/autofile/autofile_test.go index 9864ed82a..dc5ba0682 100644 --- a/internal/libs/autofile/autofile_test.go +++ b/internal/libs/autofile/autofile_test.go @@ -134,7 +134,7 @@ func TestAutoFileSize(t *testing.T) { require.NoError(t, err) // 3. Not existing file - require.NoError(t, af.Close()) + require.NoError(t, af.closeFile()) require.NoError(t, os.Remove(f.Name())) size, err = af.Size() require.EqualValues(t, 0, size, "Expected a new file to be empty") diff --git a/internal/libs/autofile/group.go b/internal/libs/autofile/group.go index 0ffc2f04c..4aa6f2cb5 100644 --- a/internal/libs/autofile/group.go +++ b/internal/libs/autofile/group.go @@ -56,6 +56,7 @@ assuming that marker lines are written occasionally. type Group struct { service.BaseService logger log.Logger + ctx context.Context ID string Head *AutoFile // The head AutoFile to write to @@ -92,6 +93,7 @@ func OpenGroup(ctx context.Context, logger log.Logger, headPath string, groupOpt g := &Group{ logger: logger, + ctx: ctx, ID: "group:" + head.ID, Head: head, headBuf: bufio.NewWriterSize(head, 4096*10), @@ -168,7 +170,7 @@ func (g *Group) Close() { } g.mtx.Lock() - _ = g.Head.closeFile() + _ = g.Head.Close() g.mtx.Unlock() } @@ -304,7 +306,6 @@ func (g *Group) checkTotalSizeLimit() { } // RotateFile causes group to close the current head and assign it some index. -// Note it does not create a new head. func (g *Group) RotateFile() { g.mtx.Lock() defer g.mtx.Unlock() @@ -314,20 +315,20 @@ func (g *Group) RotateFile() { if err := g.headBuf.Flush(); err != nil { panic(err) } - if err := g.Head.Sync(); err != nil { panic(err) } + err := g.Head.withLock(func() error { + if err := g.Head.unsyncCloseFile(); err != nil { + return err + } - if err := g.Head.closeFile(); err != nil { + indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1) + return os.Rename(headPath, indexPath) + }) + if err != nil { panic(err) } - - indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1) - if err := os.Rename(headPath, indexPath); err != nil { - panic(err) - } - g.maxIndex++ } From 5eae2e62c0bcf359c2c54a345513cb56aedab070 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Tue, 18 Jan 2022 16:36:09 -0800 Subject: [PATCH 4/4] privval: synchronize leak check with shutdown (#7629) The interaction between defers and t.Cleanup can be delicate. For this case, which regularly flakes in CI, be explicit: Defer the closes and waits before making any attempt to leaktest. --- privval/signer_client_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/privval/signer_client_test.go b/privval/signer_client_test.go index b77649684..d8bb25828 100644 --- a/privval/signer_client_test.go +++ b/privval/signer_client_test.go @@ -73,14 +73,15 @@ func TestSignerClose(t *testing.T) { for _, tc := range getSignerTestCases(bctx, t, logger) { t.Run(tc.name, func(t *testing.T) { - t.Cleanup(leaktest.Check(t)) - - defer tc.closer() + defer leaktest.Check(t) + defer func() { + tc.closer() + tc.signerClient.endpoint.Wait() + tc.signerServer.Wait() + }() assert.NoError(t, tc.signerClient.Close()) assert.NoError(t, tc.signerServer.Stop()) - t.Cleanup(tc.signerClient.endpoint.Wait) - t.Cleanup(tc.signerServer.Wait) }) } }