diff --git a/internal/libs/autofile/autofile.go b/internal/libs/autofile/autofile.go index 0bc9a63a3..6f38fc43b 100644 --- a/internal/libs/autofile/autofile.go +++ b/internal/libs/autofile/autofile.go @@ -2,6 +2,8 @@ package autofile import ( "context" + "errors" + "fmt" "os" "os/signal" "path/filepath" @@ -39,6 +41,10 @@ const ( autoFilePerms = os.FileMode(0600) ) +// errAutoFileClosed is reported when operations attempt to use an autofile +// after it has been closed. +var errAutoFileClosed = errors.New("autofile is closed") + // AutoFile automatically closes and re-opens file for writing. The file is // automatically setup to close itself every 1s and upon receiving SIGHUP. // @@ -47,12 +53,12 @@ type AutoFile struct { ID string Path string - closeTicker *time.Ticker - closeTickerStopc chan struct{} // closed when closeTicker is stopped - hupc chan os.Signal + closeTicker *time.Ticker // signals periodic close + cancel func() // cancels the lifecycle context - mtx sync.Mutex - file *os.File + mtx sync.Mutex // guards the fields below + closed bool // true when the the autofile is no longer usable + file *os.File // the underlying file (may be nil) } // OpenAutoFile creates an AutoFile in the path (with random ID). If there is @@ -64,24 +70,28 @@ func OpenAutoFile(ctx context.Context, path string) (*AutoFile, error) { if err != nil { return nil, err } + + ctx, cancel := context.WithCancel(ctx) af := &AutoFile{ - ID: tmrand.Str(12) + ":" + path, - Path: path, - closeTicker: time.NewTicker(autoFileClosePeriod), - closeTickerStopc: make(chan struct{}), + ID: tmrand.Str(12) + ":" + path, + Path: path, + closeTicker: time.NewTicker(autoFileClosePeriod), + cancel: cancel, } if err := af.openFile(); err != nil { af.Close() return nil, err } - // Close file on SIGHUP. - af.hupc = make(chan os.Signal, 1) - signal.Notify(af.hupc, syscall.SIGHUP) + // Set up a SIGHUP handler to forcibly flush and close the filehandle. + // This forces the next operation to re-open the underlying path. + hupc := make(chan os.Signal, 1) + signal.Notify(hupc, syscall.SIGHUP) go func() { + defer close(hupc) for { select { - case <-af.hupc: + case <-hupc: _ = af.closeFile() case <-ctx.Done(): return @@ -94,42 +104,47 @@ func OpenAutoFile(ctx context.Context, path string) (*AutoFile, error) { return af, nil } -// Close shuts down the closing goroutine, SIGHUP handler and closes the -// AutoFile. +// Close shuts down the service goroutine and marks af as invalid. Operations +// on af after Close will report an error. func (af *AutoFile) Close() error { - af.closeTicker.Stop() - close(af.closeTickerStopc) - if af.hupc != nil { - close(af.hupc) - } - return af.closeFile() + return af.withLock(func() error { + af.cancel() // signal the close service to stop + af.closed = true // mark the file as invalid + return af.unsyncCloseFile() + }) } func (af *AutoFile) closeFileRoutine(ctx context.Context) { for { select { case <-ctx.Done(): - _ = af.closeFile() + _ = af.Close() return case <-af.closeTicker.C: _ = af.closeFile() - case <-af.closeTickerStopc: - return } } } func (af *AutoFile) closeFile() (err error) { + return af.withLock(af.unsyncCloseFile) +} + +// unsyncCloseFile closes the underlying filehandle if one is open, and reports +// any error it returns. The caller must hold af.mtx exclusively. +func (af *AutoFile) unsyncCloseFile() error { + if fp := af.file; fp != nil { + af.file = nil + return fp.Close() + } + return nil +} + +// withLock runs f while holding af.mtx, and reports any error it returns. +func (af *AutoFile) withLock(f func() error) error { af.mtx.Lock() defer af.mtx.Unlock() - - file := af.file - if file == nil { - return nil - } - - af.file = nil - return file.Close() + return f() } // Write writes len(b) bytes to the AutoFile. It returns the number of bytes @@ -139,6 +154,9 @@ func (af *AutoFile) closeFile() (err error) { func (af *AutoFile) Write(b []byte) (n int, err error) { af.mtx.Lock() defer af.mtx.Unlock() + if af.closed { + return 0, fmt.Errorf("write: %w", errAutoFileClosed) + } if af.file == nil { if err = af.openFile(); err != nil { @@ -153,19 +171,19 @@ func (af *AutoFile) Write(b []byte) (n int, err error) { // Sync commits the current contents of the file to stable storage. Typically, // this means flushing the file system's in-memory copy of recently written // data to disk. -// Opens AutoFile if needed. func (af *AutoFile) Sync() error { - af.mtx.Lock() - defer af.mtx.Unlock() - - if af.file == nil { - if err := af.openFile(); err != nil { - return err + return af.withLock(func() error { + if af.closed { + return fmt.Errorf("sync: %w", errAutoFileClosed) + } else if af.file == nil { + return nil // nothing to sync } - } - return af.file.Sync() + return af.file.Sync() + }) } +// openFile unconditionally replaces af.file with a new filehandle on the path. +// The caller must hold af.mtx exclusively. func (af *AutoFile) openFile() error { file, err := os.OpenFile(af.Path, os.O_RDWR|os.O_CREATE|os.O_APPEND, autoFilePerms) if err != nil { @@ -188,6 +206,9 @@ func (af *AutoFile) openFile() error { func (af *AutoFile) Size() (int64, error) { af.mtx.Lock() defer af.mtx.Unlock() + if af.closed { + return 0, fmt.Errorf("size: %w", errAutoFileClosed) + } if af.file == nil { if err := af.openFile(); err != nil { diff --git a/internal/libs/autofile/autofile_test.go b/internal/libs/autofile/autofile_test.go index 9864ed82a..dc5ba0682 100644 --- a/internal/libs/autofile/autofile_test.go +++ b/internal/libs/autofile/autofile_test.go @@ -134,7 +134,7 @@ func TestAutoFileSize(t *testing.T) { require.NoError(t, err) // 3. Not existing file - require.NoError(t, af.Close()) + require.NoError(t, af.closeFile()) require.NoError(t, os.Remove(f.Name())) size, err = af.Size() require.EqualValues(t, 0, size, "Expected a new file to be empty") diff --git a/internal/libs/autofile/group.go b/internal/libs/autofile/group.go index 0ffc2f04c..4aa6f2cb5 100644 --- a/internal/libs/autofile/group.go +++ b/internal/libs/autofile/group.go @@ -56,6 +56,7 @@ assuming that marker lines are written occasionally. type Group struct { service.BaseService logger log.Logger + ctx context.Context ID string Head *AutoFile // The head AutoFile to write to @@ -92,6 +93,7 @@ func OpenGroup(ctx context.Context, logger log.Logger, headPath string, groupOpt g := &Group{ logger: logger, + ctx: ctx, ID: "group:" + head.ID, Head: head, headBuf: bufio.NewWriterSize(head, 4096*10), @@ -168,7 +170,7 @@ func (g *Group) Close() { } g.mtx.Lock() - _ = g.Head.closeFile() + _ = g.Head.Close() g.mtx.Unlock() } @@ -304,7 +306,6 @@ func (g *Group) checkTotalSizeLimit() { } // RotateFile causes group to close the current head and assign it some index. -// Note it does not create a new head. func (g *Group) RotateFile() { g.mtx.Lock() defer g.mtx.Unlock() @@ -314,20 +315,20 @@ func (g *Group) RotateFile() { if err := g.headBuf.Flush(); err != nil { panic(err) } - if err := g.Head.Sync(); err != nil { panic(err) } + err := g.Head.withLock(func() error { + if err := g.Head.unsyncCloseFile(); err != nil { + return err + } - if err := g.Head.closeFile(); err != nil { + indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1) + return os.Rename(headPath, indexPath) + }) + if err != nil { panic(err) } - - indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1) - if err := os.Rename(headPath, indexPath); err != nil { - panic(err) - } - g.maxIndex++ } diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index 87609cc85..0c256a4f3 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -194,7 +194,10 @@ func (r *Reactor) processPexCh(ctx context.Context) { } // inbound requests for new peers or responses to requests sent by this // reactor - case envelope := <-incoming: + case envelope, ok := <-incoming: + if !ok { + return + } duration, err = r.handleMessage(ctx, r.pexCh.ID, envelope) if err != nil { r.logger.Error("failed to process message", "ch_id", r.pexCh.ID, "envelope", envelope, "err", err) diff --git a/privval/signer_client_test.go b/privval/signer_client_test.go index b77649684..d8bb25828 100644 --- a/privval/signer_client_test.go +++ b/privval/signer_client_test.go @@ -73,14 +73,15 @@ func TestSignerClose(t *testing.T) { for _, tc := range getSignerTestCases(bctx, t, logger) { t.Run(tc.name, func(t *testing.T) { - t.Cleanup(leaktest.Check(t)) - - defer tc.closer() + defer leaktest.Check(t) + defer func() { + tc.closer() + tc.signerClient.endpoint.Wait() + tc.signerServer.Wait() + }() assert.NoError(t, tc.signerClient.Close()) assert.NoError(t, tc.signerServer.Stop()) - t.Cleanup(tc.signerClient.endpoint.Wait) - t.Cleanup(tc.signerServer.Wait) }) } }