mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-09 05:20:10 +00:00
Merge branch 'master' into wb/remove-privval-reseuse
This commit is contained in:
@@ -2,6 +2,8 @@ package autofile
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
@@ -39,6 +41,10 @@ const (
|
||||
autoFilePerms = os.FileMode(0600)
|
||||
)
|
||||
|
||||
// errAutoFileClosed is reported when operations attempt to use an autofile
|
||||
// after it has been closed.
|
||||
var errAutoFileClosed = errors.New("autofile is closed")
|
||||
|
||||
// AutoFile automatically closes and re-opens file for writing. The file is
|
||||
// automatically setup to close itself every 1s and upon receiving SIGHUP.
|
||||
//
|
||||
@@ -47,12 +53,12 @@ type AutoFile struct {
|
||||
ID string
|
||||
Path string
|
||||
|
||||
closeTicker *time.Ticker
|
||||
closeTickerStopc chan struct{} // closed when closeTicker is stopped
|
||||
hupc chan os.Signal
|
||||
closeTicker *time.Ticker // signals periodic close
|
||||
cancel func() // cancels the lifecycle context
|
||||
|
||||
mtx sync.Mutex
|
||||
file *os.File
|
||||
mtx sync.Mutex // guards the fields below
|
||||
closed bool // true when the the autofile is no longer usable
|
||||
file *os.File // the underlying file (may be nil)
|
||||
}
|
||||
|
||||
// OpenAutoFile creates an AutoFile in the path (with random ID). If there is
|
||||
@@ -64,24 +70,28 @@ func OpenAutoFile(ctx context.Context, path string) (*AutoFile, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
af := &AutoFile{
|
||||
ID: tmrand.Str(12) + ":" + path,
|
||||
Path: path,
|
||||
closeTicker: time.NewTicker(autoFileClosePeriod),
|
||||
closeTickerStopc: make(chan struct{}),
|
||||
ID: tmrand.Str(12) + ":" + path,
|
||||
Path: path,
|
||||
closeTicker: time.NewTicker(autoFileClosePeriod),
|
||||
cancel: cancel,
|
||||
}
|
||||
if err := af.openFile(); err != nil {
|
||||
af.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Close file on SIGHUP.
|
||||
af.hupc = make(chan os.Signal, 1)
|
||||
signal.Notify(af.hupc, syscall.SIGHUP)
|
||||
// Set up a SIGHUP handler to forcibly flush and close the filehandle.
|
||||
// This forces the next operation to re-open the underlying path.
|
||||
hupc := make(chan os.Signal, 1)
|
||||
signal.Notify(hupc, syscall.SIGHUP)
|
||||
go func() {
|
||||
defer close(hupc)
|
||||
for {
|
||||
select {
|
||||
case <-af.hupc:
|
||||
case <-hupc:
|
||||
_ = af.closeFile()
|
||||
case <-ctx.Done():
|
||||
return
|
||||
@@ -94,42 +104,47 @@ func OpenAutoFile(ctx context.Context, path string) (*AutoFile, error) {
|
||||
return af, nil
|
||||
}
|
||||
|
||||
// Close shuts down the closing goroutine, SIGHUP handler and closes the
|
||||
// AutoFile.
|
||||
// Close shuts down the service goroutine and marks af as invalid. Operations
|
||||
// on af after Close will report an error.
|
||||
func (af *AutoFile) Close() error {
|
||||
af.closeTicker.Stop()
|
||||
close(af.closeTickerStopc)
|
||||
if af.hupc != nil {
|
||||
close(af.hupc)
|
||||
}
|
||||
return af.closeFile()
|
||||
return af.withLock(func() error {
|
||||
af.cancel() // signal the close service to stop
|
||||
af.closed = true // mark the file as invalid
|
||||
return af.unsyncCloseFile()
|
||||
})
|
||||
}
|
||||
|
||||
func (af *AutoFile) closeFileRoutine(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
_ = af.closeFile()
|
||||
_ = af.Close()
|
||||
return
|
||||
case <-af.closeTicker.C:
|
||||
_ = af.closeFile()
|
||||
case <-af.closeTickerStopc:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (af *AutoFile) closeFile() (err error) {
|
||||
return af.withLock(af.unsyncCloseFile)
|
||||
}
|
||||
|
||||
// unsyncCloseFile closes the underlying filehandle if one is open, and reports
|
||||
// any error it returns. The caller must hold af.mtx exclusively.
|
||||
func (af *AutoFile) unsyncCloseFile() error {
|
||||
if fp := af.file; fp != nil {
|
||||
af.file = nil
|
||||
return fp.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// withLock runs f while holding af.mtx, and reports any error it returns.
|
||||
func (af *AutoFile) withLock(f func() error) error {
|
||||
af.mtx.Lock()
|
||||
defer af.mtx.Unlock()
|
||||
|
||||
file := af.file
|
||||
if file == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
af.file = nil
|
||||
return file.Close()
|
||||
return f()
|
||||
}
|
||||
|
||||
// Write writes len(b) bytes to the AutoFile. It returns the number of bytes
|
||||
@@ -139,6 +154,9 @@ func (af *AutoFile) closeFile() (err error) {
|
||||
func (af *AutoFile) Write(b []byte) (n int, err error) {
|
||||
af.mtx.Lock()
|
||||
defer af.mtx.Unlock()
|
||||
if af.closed {
|
||||
return 0, fmt.Errorf("write: %w", errAutoFileClosed)
|
||||
}
|
||||
|
||||
if af.file == nil {
|
||||
if err = af.openFile(); err != nil {
|
||||
@@ -153,19 +171,19 @@ func (af *AutoFile) Write(b []byte) (n int, err error) {
|
||||
// Sync commits the current contents of the file to stable storage. Typically,
|
||||
// this means flushing the file system's in-memory copy of recently written
|
||||
// data to disk.
|
||||
// Opens AutoFile if needed.
|
||||
func (af *AutoFile) Sync() error {
|
||||
af.mtx.Lock()
|
||||
defer af.mtx.Unlock()
|
||||
|
||||
if af.file == nil {
|
||||
if err := af.openFile(); err != nil {
|
||||
return err
|
||||
return af.withLock(func() error {
|
||||
if af.closed {
|
||||
return fmt.Errorf("sync: %w", errAutoFileClosed)
|
||||
} else if af.file == nil {
|
||||
return nil // nothing to sync
|
||||
}
|
||||
}
|
||||
return af.file.Sync()
|
||||
return af.file.Sync()
|
||||
})
|
||||
}
|
||||
|
||||
// openFile unconditionally replaces af.file with a new filehandle on the path.
|
||||
// The caller must hold af.mtx exclusively.
|
||||
func (af *AutoFile) openFile() error {
|
||||
file, err := os.OpenFile(af.Path, os.O_RDWR|os.O_CREATE|os.O_APPEND, autoFilePerms)
|
||||
if err != nil {
|
||||
@@ -188,6 +206,9 @@ func (af *AutoFile) openFile() error {
|
||||
func (af *AutoFile) Size() (int64, error) {
|
||||
af.mtx.Lock()
|
||||
defer af.mtx.Unlock()
|
||||
if af.closed {
|
||||
return 0, fmt.Errorf("size: %w", errAutoFileClosed)
|
||||
}
|
||||
|
||||
if af.file == nil {
|
||||
if err := af.openFile(); err != nil {
|
||||
|
||||
@@ -134,7 +134,7 @@ func TestAutoFileSize(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
// 3. Not existing file
|
||||
require.NoError(t, af.Close())
|
||||
require.NoError(t, af.closeFile())
|
||||
require.NoError(t, os.Remove(f.Name()))
|
||||
size, err = af.Size()
|
||||
require.EqualValues(t, 0, size, "Expected a new file to be empty")
|
||||
|
||||
@@ -56,6 +56,7 @@ assuming that marker lines are written occasionally.
|
||||
type Group struct {
|
||||
service.BaseService
|
||||
logger log.Logger
|
||||
ctx context.Context
|
||||
|
||||
ID string
|
||||
Head *AutoFile // The head AutoFile to write to
|
||||
@@ -92,6 +93,7 @@ func OpenGroup(ctx context.Context, logger log.Logger, headPath string, groupOpt
|
||||
|
||||
g := &Group{
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
ID: "group:" + head.ID,
|
||||
Head: head,
|
||||
headBuf: bufio.NewWriterSize(head, 4096*10),
|
||||
@@ -168,7 +170,7 @@ func (g *Group) Close() {
|
||||
}
|
||||
|
||||
g.mtx.Lock()
|
||||
_ = g.Head.closeFile()
|
||||
_ = g.Head.Close()
|
||||
g.mtx.Unlock()
|
||||
}
|
||||
|
||||
@@ -304,7 +306,6 @@ func (g *Group) checkTotalSizeLimit() {
|
||||
}
|
||||
|
||||
// RotateFile causes group to close the current head and assign it some index.
|
||||
// Note it does not create a new head.
|
||||
func (g *Group) RotateFile() {
|
||||
g.mtx.Lock()
|
||||
defer g.mtx.Unlock()
|
||||
@@ -314,20 +315,20 @@ func (g *Group) RotateFile() {
|
||||
if err := g.headBuf.Flush(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err := g.Head.Sync(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err := g.Head.withLock(func() error {
|
||||
if err := g.Head.unsyncCloseFile(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := g.Head.closeFile(); err != nil {
|
||||
indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1)
|
||||
return os.Rename(headPath, indexPath)
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1)
|
||||
if err := os.Rename(headPath, indexPath); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
g.maxIndex++
|
||||
}
|
||||
|
||||
|
||||
@@ -194,7 +194,10 @@ func (r *Reactor) processPexCh(ctx context.Context) {
|
||||
}
|
||||
// inbound requests for new peers or responses to requests sent by this
|
||||
// reactor
|
||||
case envelope := <-incoming:
|
||||
case envelope, ok := <-incoming:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
duration, err = r.handleMessage(ctx, r.pexCh.ID, envelope)
|
||||
if err != nil {
|
||||
r.logger.Error("failed to process message", "ch_id", r.pexCh.ID, "envelope", envelope, "err", err)
|
||||
|
||||
@@ -73,14 +73,15 @@ func TestSignerClose(t *testing.T) {
|
||||
|
||||
for _, tc := range getSignerTestCases(bctx, t, logger) {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Cleanup(leaktest.Check(t))
|
||||
|
||||
defer tc.closer()
|
||||
defer leaktest.Check(t)
|
||||
defer func() {
|
||||
tc.closer()
|
||||
tc.signerClient.endpoint.Wait()
|
||||
tc.signerServer.Wait()
|
||||
}()
|
||||
|
||||
assert.NoError(t, tc.signerClient.Close())
|
||||
assert.NoError(t, tc.signerServer.Stop())
|
||||
t.Cleanup(tc.signerClient.endpoint.Wait)
|
||||
t.Cleanup(tc.signerServer.Wait)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user