posix: fix fallback tempfile naming

This commit is contained in:
Ben McClelland
2023-05-31 04:09:47 +00:00
parent 4d7c12def3
commit 981894aef2
3 changed files with 83 additions and 43 deletions

View File

@@ -251,7 +251,7 @@ func (p *Posix) CompleteMultipartUpload(bucket, object, uploadID string, parts [
}
}
f, err := openTmpFile(metaTmpDir, object, 0)
f, err := openTmpFile(filepath.Join(bucket, metaTmpDir), bucket, object, 0)
if err != nil {
return nil, fmt.Errorf("open temp file: %w", err)
}
@@ -686,10 +686,11 @@ func (p *Posix) PutObjectPart(bucket, object, uploadID string, part int, length
}
sum := sha256.Sum256([]byte(object))
objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum))
objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum))
partPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", part))
f, err := openTmpFile(objdir, partPath, length)
f, err := openTmpFile(filepath.Join(bucket, objdir),
bucket, partPath, length)
if err != nil {
return "", fmt.Errorf("open temp file: %w", err)
}
@@ -742,7 +743,8 @@ func (p *Posix) PutObject(po *s3.PutObjectInput) (string, error) {
xattr.Set(name, dirObjKey, nil)
} else {
// object is file
f, err := openTmpFile(metaTmpDir, *po.Key, po.ContentLength)
f, err := openTmpFile(filepath.Join(*po.Bucket, metaTmpDir),
*po.Bucket, *po.Key, po.ContentLength)
if err != nil {
return "", fmt.Errorf("open temp file: %w", err)
}

View File

@@ -20,30 +20,39 @@ import (
"fmt"
"io/fs"
"os"
"path/filepath"
)
type tmpfile struct {
f *os.File
bucket string
objname string
size int64
}
func openTmpFile(dir, obj string, size int64) (*tmpfile, error) {
func openTmpFile(dir, bucket, obj string, size int64) (*tmpfile, error) {
// Create a temp file for upload while in progress (see link comments below).
f, err := os.CreateTemp(dir,
fmt.Sprintf("%x\n", sha256.Sum256([]byte(obj))))
fmt.Sprintf("%x.", sha256.Sum256([]byte(obj))))
if err != nil {
return nil, err
}
return &tmpfile{f: f, objname: obj}, nil
return &tmpfile{f: f, bucket: bucket, objname: obj, size: size}, nil
}
func (tmp *tmpfile) link() error {
tempname := tmp.f.Name()
// cleanup in case anything goes wrong, if rename succeeds then
// this will no longer exist
defer os.Remove(tempname)
// We use Rename as the atomic operation for object puts. The upload is
// written to a temp file to not conflict with any other simultaneous
// uploads. The final operation is to move the temp file into place for
// the object. This ensures the object semantics of last upload completed
// wins and is not some combination of writes from simultaneous uploads.
err := os.Remove(tmp.objname)
objPath := filepath.Join(tmp.bucket, tmp.objname)
err := os.Remove(objPath)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("remove stale path: %w", err)
}
@@ -53,7 +62,7 @@ func (tmp *tmpfile) link() error {
return fmt.Errorf("close tmpfile: %w", err)
}
err = os.Rename(tmp.f.Name(), tmp.objname)
err = os.Rename(tempname, objPath)
if err != nil {
return fmt.Errorf("rename tmpfile: %w", err)
}
@@ -62,7 +71,13 @@ func (tmp *tmpfile) link() error {
}
func (tmp *tmpfile) Write(b []byte) (int, error) {
return tmp.f.Write(b)
if int64(len(b)) > tmp.size {
return 0, fmt.Errorf("write exceeds content length")
}
n, err := tmp.f.Write(b)
tmp.size -= int64(n)
return n, err
}
func (tmp *tmpfile) cleanup() {

View File

@@ -31,11 +31,13 @@ const procfddir = "/proc/self/fd"
type tmpfile struct {
f *os.File
bucket string
objname string
isOTmp bool
size int64
}
func openTmpFile(dir, obj string, size int64) (*tmpfile, error) {
func openTmpFile(dir, bucket, obj string, size int64) (*tmpfile, error) {
// O_TMPFILE allows for a file handle to an unnamed file in the filesystem.
// This can help reduce contention within the namespace (parent directories),
// etc. And will auto cleanup the inode on close if we never link this
@@ -46,29 +48,32 @@ func openTmpFile(dir, obj string, size int64) (*tmpfile, error) {
if err != nil {
// O_TMPFILE not supported, try fallback
f, err := os.CreateTemp(dir,
fmt.Sprintf("%x\n", sha256.Sum256([]byte(obj))))
fmt.Sprintf("%x.", sha256.Sum256([]byte(obj))))
if err != nil {
return nil, err
}
tmp := &tmpfile{f: f}
tmp := &tmpfile{f: f, bucket: bucket, objname: obj, size: size}
// falloc is best effort, its fine if this fails
if size > 0 {
tmp.falloc(size)
tmp.falloc()
}
return tmp, nil
}
// for O_TMPFILE, filename is /proc/self/fd/<fd> to be used
// later to link file into namespace
f := os.NewFile(uintptr(fd), filepath.Join(procfddir, strconv.Itoa(fd)))
tmp := &tmpfile{f: f, isOTmp: true}
tmp := &tmpfile{f: f, isOTmp: true, size: size}
// falloc is best effort, its fine if this fails
if size > 0 {
tmp.falloc(size)
tmp.falloc()
}
return tmp, nil
}
func (tmp *tmpfile) falloc(size int64) error {
err := syscall.Fallocate(int(tmp.f.Fd()), 0, 0, size)
func (tmp *tmpfile) falloc() error {
err := syscall.Fallocate(int(tmp.f.Fd()), 0, 0, tmp.size)
if err != nil {
return fmt.Errorf("fallocate: %v", err)
}
@@ -82,36 +87,33 @@ func (tmp *tmpfile) link() error {
// temp file into place for the object. This ensures the object semantics
// of last upload completed wins and is not some combination of writes
// from simultaneous uploads.
err := os.Remove(tmp.objname)
objPath := filepath.Join(tmp.bucket, tmp.objname)
err := os.Remove(objPath)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("remove stale path: %w", err)
}
if tmp.isOTmp {
procdir, err := os.Open(procfddir)
if err != nil {
return fmt.Errorf("open proc dir: %w", err)
}
defer procdir.Close()
if !tmp.isOTmp {
// O_TMPFILE not suported, use fallback
return tmp.fallbackLink()
}
dir, err := os.Open(filepath.Dir(tmp.objname))
if err != nil {
return fmt.Errorf("open parent dir: %w", err)
}
defer dir.Close()
procdir, err := os.Open(procfddir)
if err != nil {
return fmt.Errorf("open proc dir: %w", err)
}
defer procdir.Close()
err = unix.Linkat(int(procdir.Fd()), filepath.Base(tmp.f.Name()),
int(dir.Fd()), filepath.Base(tmp.objname), unix.AT_SYMLINK_FOLLOW)
if err != nil {
return fmt.Errorf("link tmpfile: %w", err)
}
dir, err := os.Open(filepath.Dir(objPath))
if err != nil {
return fmt.Errorf("open parent dir: %w", err)
}
defer dir.Close()
err = tmp.f.Close()
if err != nil {
return fmt.Errorf("close tmpfile: %w", err)
}
return nil
err = unix.Linkat(int(procdir.Fd()), filepath.Base(tmp.f.Name()),
int(dir.Fd()), filepath.Base(objPath), unix.AT_SYMLINK_FOLLOW)
if err != nil {
return fmt.Errorf("link tmpfile: %w", err)
}
err = tmp.f.Close()
@@ -119,7 +121,22 @@ func (tmp *tmpfile) link() error {
return fmt.Errorf("close tmpfile: %w", err)
}
err = os.Rename(tmp.f.Name(), tmp.objname)
return nil
}
func (tmp *tmpfile) fallbackLink() error {
tempname := tmp.f.Name()
// cleanup in case anything goes wrong, if rename succeeds then
// this will no longer exist
defer os.Remove(tempname)
err := tmp.f.Close()
if err != nil {
return fmt.Errorf("close tmpfile: %w", err)
}
objPath := filepath.Join(tmp.bucket, tmp.objname)
err = os.Rename(tempname, objPath)
if err != nil {
return fmt.Errorf("rename tmpfile: %w", err)
}
@@ -128,7 +145,13 @@ func (tmp *tmpfile) link() error {
}
func (tmp *tmpfile) Write(b []byte) (int, error) {
return tmp.f.Write(b)
if int64(len(b)) > tmp.size {
return 0, fmt.Errorf("write exceeds content length")
}
n, err := tmp.f.Write(b)
tmp.size -= int64(n)
return n, err
}
func (tmp *tmpfile) cleanup() {