Merge pull request #47 from versity/ben/posix_tmp

posix: fix fallback tempfile naming
This commit is contained in:
Ben McClelland
2023-05-30 21:48:02 -07:00
committed by GitHub
3 changed files with 131 additions and 80 deletions

View File

@@ -251,7 +251,7 @@ func (p *Posix) CompleteMultipartUpload(bucket, object, uploadID string, parts [
}
}
f, err := openTmpFile(metaTmpDir, object, 0)
f, err := openTmpFile(filepath.Join(bucket, metaTmpDir), bucket, object, 0)
if err != nil {
return nil, fmt.Errorf("open temp file: %w", err)
}
@@ -686,10 +686,11 @@ func (p *Posix) PutObjectPart(bucket, object, uploadID string, part int, length
}
sum := sha256.Sum256([]byte(object))
objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum))
objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum))
partPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", part))
f, err := openTmpFile(objdir, partPath, length)
f, err := openTmpFile(filepath.Join(bucket, objdir),
bucket, partPath, length)
if err != nil {
return "", fmt.Errorf("open temp file: %w", err)
}
@@ -725,8 +726,6 @@ func (p *Posix) PutObject(po *s3.PutObjectInput) (string, error) {
name := filepath.Join(*po.Bucket, *po.Key)
etag := ""
if strings.HasSuffix(*po.Key, "/") {
// object is directory
err = mkdirAll(name, os.FileMode(0755), *po.Bucket, *po.Key)
@@ -740,46 +739,52 @@ func (p *Posix) PutObject(po *s3.PutObjectInput) (string, error) {
// set our attribute that this dir was specifically put
xattr.Set(name, dirObjKey, nil)
} else {
// object is file
f, err := openTmpFile(metaTmpDir, *po.Key, po.ContentLength)
// TODO: what etag should be returned here
// and we should set etag xattr to identify dir was
// specifically uploaded
return "", nil
}
// object is file
f, err := openTmpFile(filepath.Join(*po.Bucket, metaTmpDir),
*po.Bucket, *po.Key, po.ContentLength)
if err != nil {
return "", fmt.Errorf("open temp file: %w", err)
}
defer f.cleanup()
hash := md5.New()
rdr := io.TeeReader(po.Body, hash)
_, err = io.Copy(f, rdr)
if err != nil {
return "", fmt.Errorf("write object data: %w", err)
}
dir := filepath.Dir(name)
if dir != "" {
err = mkdirAll(dir, os.FileMode(0755), *po.Bucket, *po.Key)
if err != nil {
return "", fmt.Errorf("open temp file: %w", err)
return "", fmt.Errorf("make object parent directories: %w", err)
}
defer f.cleanup()
}
hash := md5.New()
rdr := io.TeeReader(po.Body, hash)
_, err = io.Copy(f, rdr)
err = f.link()
if err != nil {
return "", fmt.Errorf("link object in namespace: %w", err)
}
for k, v := range po.Metadata {
xattr.Set(name, "user."+k, []byte(v))
}
dataSum := hash.Sum(nil)
etag := hex.EncodeToString(dataSum[:])
xattr.Set(name, "user.etag", []byte(etag))
if newObjUID != 0 || newObjGID != 0 {
err = os.Chown(name, newObjUID, newObjGID)
if err != nil {
return "", fmt.Errorf("write object data: %w", err)
}
dir := filepath.Dir(name)
if dir != "" {
err = mkdirAll(dir, os.FileMode(0755), *po.Bucket, *po.Key)
if err != nil {
return "", fmt.Errorf("make object parent directories: %w", err)
}
}
err = f.link()
if err != nil {
return "", fmt.Errorf("link object in namespace: %w", err)
}
for k, v := range po.Metadata {
xattr.Set(name, "user."+k, []byte(v))
}
dataSum := hash.Sum(nil)
etag := hex.EncodeToString(dataSum[:])
xattr.Set(name, "user.etag", []byte(etag))
if newObjUID != 0 || newObjGID != 0 {
err = os.Chown(name, newObjUID, newObjGID)
if err != nil {
return "", fmt.Errorf("set object uid/gid: %v", err)
}
return "", fmt.Errorf("set object uid/gid: %v", err)
}
}

View File

@@ -20,30 +20,43 @@ import (
"fmt"
"io/fs"
"os"
"path/filepath"
)
type tmpfile struct {
f *os.File
bucket string
objname string
size int64
}
func openTmpFile(dir, obj string, size int64) (*tmpfile, error) {
func openTmpFile(dir, bucket, obj string, size int64) (*tmpfile, error) {
// Create a temp file for upload while in progress (see link comments below).
err := os.MkdirAll(dir, 0700)
if err != nil {
return nil, fmt.Errorf("make temp dir: %w", err)
}
f, err := os.CreateTemp(dir,
fmt.Sprintf("%x\n", sha256.Sum256([]byte(obj))))
fmt.Sprintf("%x.", sha256.Sum256([]byte(obj))))
if err != nil {
return nil, err
}
return &tmpfile{f: f, objname: obj}, nil
return &tmpfile{f: f, bucket: bucket, objname: obj, size: size}, nil
}
func (tmp *tmpfile) link() error {
tempname := tmp.f.Name()
// cleanup in case anything goes wrong, if rename succeeds then
// this will no longer exist
defer os.Remove(tempname)
// We use Rename as the atomic operation for object puts. The upload is
// written to a temp file to not conflict with any other simultaneous
// uploads. The final operation is to move the temp file into place for
// the object. This ensures the object semantics of last upload completed
// wins and is not some combination of writes from simultaneous uploads.
err := os.Remove(tmp.objname)
objPath := filepath.Join(tmp.bucket, tmp.objname)
err := os.Remove(objPath)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("remove stale path: %w", err)
}
@@ -53,7 +66,7 @@ func (tmp *tmpfile) link() error {
return fmt.Errorf("close tmpfile: %w", err)
}
err = os.Rename(tmp.f.Name(), tmp.objname)
err = os.Rename(tempname, objPath)
if err != nil {
return fmt.Errorf("rename tmpfile: %w", err)
}
@@ -62,7 +75,13 @@ func (tmp *tmpfile) link() error {
}
func (tmp *tmpfile) Write(b []byte) (int, error) {
return tmp.f.Write(b)
if int64(len(b)) > tmp.size {
return 0, fmt.Errorf("write exceeds content length")
}
n, err := tmp.f.Write(b)
tmp.size -= int64(n)
return n, err
}
func (tmp *tmpfile) cleanup() {

View File

@@ -31,11 +31,13 @@ const procfddir = "/proc/self/fd"
type tmpfile struct {
f *os.File
bucket string
objname string
isOTmp bool
size int64
}
func openTmpFile(dir, obj string, size int64) (*tmpfile, error) {
func openTmpFile(dir, bucket, obj string, size int64) (*tmpfile, error) {
// O_TMPFILE allows for a file handle to an unnamed file in the filesystem.
// This can help reduce contention within the namespace (parent directories),
// etc. And will auto cleanup the inode on close if we never link this
@@ -45,30 +47,37 @@ func openTmpFile(dir, obj string, size int64) (*tmpfile, error) {
fd, err := unix.Open(dir, unix.O_RDWR|unix.O_TMPFILE|unix.O_CLOEXEC, 0666)
if err != nil {
// O_TMPFILE not supported, try fallback
err := os.MkdirAll(dir, 0700)
if err != nil {
return nil, fmt.Errorf("make temp dir: %w", err)
}
f, err := os.CreateTemp(dir,
fmt.Sprintf("%x\n", sha256.Sum256([]byte(obj))))
fmt.Sprintf("%x.", sha256.Sum256([]byte(obj))))
if err != nil {
return nil, err
}
tmp := &tmpfile{f: f}
tmp := &tmpfile{f: f, bucket: bucket, objname: obj, size: size}
// falloc is best effort, its fine if this fails
if size > 0 {
tmp.falloc(size)
tmp.falloc()
}
return tmp, nil
}
// for O_TMPFILE, filename is /proc/self/fd/<fd> to be used
// later to link file into namespace
f := os.NewFile(uintptr(fd), filepath.Join(procfddir, strconv.Itoa(fd)))
tmp := &tmpfile{f: f, isOTmp: true}
tmp := &tmpfile{f: f, isOTmp: true, size: size}
// falloc is best effort, its fine if this fails
if size > 0 {
tmp.falloc(size)
tmp.falloc()
}
return tmp, nil
}
func (tmp *tmpfile) falloc(size int64) error {
err := syscall.Fallocate(int(tmp.f.Fd()), 0, 0, size)
func (tmp *tmpfile) falloc() error {
err := syscall.Fallocate(int(tmp.f.Fd()), 0, 0, tmp.size)
if err != nil {
return fmt.Errorf("fallocate: %v", err)
}
@@ -82,36 +91,33 @@ func (tmp *tmpfile) link() error {
// temp file into place for the object. This ensures the object semantics
// of last upload completed wins and is not some combination of writes
// from simultaneous uploads.
err := os.Remove(tmp.objname)
objPath := filepath.Join(tmp.bucket, tmp.objname)
err := os.Remove(objPath)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("remove stale path: %w", err)
}
if tmp.isOTmp {
procdir, err := os.Open(procfddir)
if err != nil {
return fmt.Errorf("open proc dir: %w", err)
}
defer procdir.Close()
if !tmp.isOTmp {
// O_TMPFILE not suported, use fallback
return tmp.fallbackLink()
}
dir, err := os.Open(filepath.Dir(tmp.objname))
if err != nil {
return fmt.Errorf("open parent dir: %w", err)
}
defer dir.Close()
procdir, err := os.Open(procfddir)
if err != nil {
return fmt.Errorf("open proc dir: %w", err)
}
defer procdir.Close()
err = unix.Linkat(int(procdir.Fd()), filepath.Base(tmp.f.Name()),
int(dir.Fd()), filepath.Base(tmp.objname), unix.AT_SYMLINK_FOLLOW)
if err != nil {
return fmt.Errorf("link tmpfile: %w", err)
}
dir, err := os.Open(filepath.Dir(objPath))
if err != nil {
return fmt.Errorf("open parent dir: %w", err)
}
defer dir.Close()
err = tmp.f.Close()
if err != nil {
return fmt.Errorf("close tmpfile: %w", err)
}
return nil
err = unix.Linkat(int(procdir.Fd()), filepath.Base(tmp.f.Name()),
int(dir.Fd()), filepath.Base(objPath), unix.AT_SYMLINK_FOLLOW)
if err != nil {
return fmt.Errorf("link tmpfile: %w", err)
}
err = tmp.f.Close()
@@ -119,7 +125,22 @@ func (tmp *tmpfile) link() error {
return fmt.Errorf("close tmpfile: %w", err)
}
err = os.Rename(tmp.f.Name(), tmp.objname)
return nil
}
func (tmp *tmpfile) fallbackLink() error {
tempname := tmp.f.Name()
// cleanup in case anything goes wrong, if rename succeeds then
// this will no longer exist
defer os.Remove(tempname)
err := tmp.f.Close()
if err != nil {
return fmt.Errorf("close tmpfile: %w", err)
}
objPath := filepath.Join(tmp.bucket, tmp.objname)
err = os.Rename(tempname, objPath)
if err != nil {
return fmt.Errorf("rename tmpfile: %w", err)
}
@@ -128,7 +149,13 @@ func (tmp *tmpfile) link() error {
}
func (tmp *tmpfile) Write(b []byte) (int, error) {
return tmp.f.Write(b)
if int64(len(b)) > tmp.size {
return 0, fmt.Errorf("write exceeds content length")
}
n, err := tmp.f.Write(b)
tmp.size -= int64(n)
return n, err
}
func (tmp *tmpfile) cleanup() {