From 981894aef20beef226918a7926d8d8727931608f Mon Sep 17 00:00:00 2001 From: Ben McClelland Date: Wed, 31 May 2023 04:09:47 +0000 Subject: [PATCH 1/3] posix: fix fallback tempfile naming --- backend/posix/posix.go | 10 ++-- backend/posix/posix_darwin.go | 27 ++++++++--- backend/posix/posix_linux.go | 89 ++++++++++++++++++++++------------- 3 files changed, 83 insertions(+), 43 deletions(-) diff --git a/backend/posix/posix.go b/backend/posix/posix.go index fccf7c0..fa547c8 100644 --- a/backend/posix/posix.go +++ b/backend/posix/posix.go @@ -251,7 +251,7 @@ func (p *Posix) CompleteMultipartUpload(bucket, object, uploadID string, parts [ } } - f, err := openTmpFile(metaTmpDir, object, 0) + f, err := openTmpFile(filepath.Join(bucket, metaTmpDir), bucket, object, 0) if err != nil { return nil, fmt.Errorf("open temp file: %w", err) } @@ -686,10 +686,11 @@ func (p *Posix) PutObjectPart(bucket, object, uploadID string, part int, length } sum := sha256.Sum256([]byte(object)) - objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum)) + objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum)) partPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", part)) - f, err := openTmpFile(objdir, partPath, length) + f, err := openTmpFile(filepath.Join(bucket, objdir), + bucket, partPath, length) if err != nil { return "", fmt.Errorf("open temp file: %w", err) } @@ -742,7 +743,8 @@ func (p *Posix) PutObject(po *s3.PutObjectInput) (string, error) { xattr.Set(name, dirObjKey, nil) } else { // object is file - f, err := openTmpFile(metaTmpDir, *po.Key, po.ContentLength) + f, err := openTmpFile(filepath.Join(*po.Bucket, metaTmpDir), + *po.Bucket, *po.Key, po.ContentLength) if err != nil { return "", fmt.Errorf("open temp file: %w", err) } diff --git a/backend/posix/posix_darwin.go b/backend/posix/posix_darwin.go index 04cbfb1..c8afcb0 100644 --- a/backend/posix/posix_darwin.go +++ b/backend/posix/posix_darwin.go @@ -20,30 +20,39 @@ import ( "fmt" "io/fs" "os" + "path/filepath" ) type tmpfile struct { f *os.File + bucket string objname string + size int64 } -func openTmpFile(dir, obj string, size int64) (*tmpfile, error) { +func openTmpFile(dir, bucket, obj string, size int64) (*tmpfile, error) { // Create a temp file for upload while in progress (see link comments below). f, err := os.CreateTemp(dir, - fmt.Sprintf("%x\n", sha256.Sum256([]byte(obj)))) + fmt.Sprintf("%x.", sha256.Sum256([]byte(obj)))) if err != nil { return nil, err } - return &tmpfile{f: f, objname: obj}, nil + return &tmpfile{f: f, bucket: bucket, objname: obj, size: size}, nil } func (tmp *tmpfile) link() error { + tempname := tmp.f.Name() + // cleanup in case anything goes wrong, if rename succeeds then + // this will no longer exist + defer os.Remove(tempname) + // We use Rename as the atomic operation for object puts. The upload is // written to a temp file to not conflict with any other simultaneous // uploads. The final operation is to move the temp file into place for // the object. This ensures the object semantics of last upload completed // wins and is not some combination of writes from simultaneous uploads. - err := os.Remove(tmp.objname) + objPath := filepath.Join(tmp.bucket, tmp.objname) + err := os.Remove(objPath) if err != nil && !errors.Is(err, fs.ErrNotExist) { return fmt.Errorf("remove stale path: %w", err) } @@ -53,7 +62,7 @@ func (tmp *tmpfile) link() error { return fmt.Errorf("close tmpfile: %w", err) } - err = os.Rename(tmp.f.Name(), tmp.objname) + err = os.Rename(tempname, objPath) if err != nil { return fmt.Errorf("rename tmpfile: %w", err) } @@ -62,7 +71,13 @@ func (tmp *tmpfile) link() error { } func (tmp *tmpfile) Write(b []byte) (int, error) { - return tmp.f.Write(b) + if int64(len(b)) > tmp.size { + return 0, fmt.Errorf("write exceeds content length") + } + + n, err := tmp.f.Write(b) + tmp.size -= int64(n) + return n, err } func (tmp *tmpfile) cleanup() { diff --git a/backend/posix/posix_linux.go b/backend/posix/posix_linux.go index 7d71d09..de83884 100644 --- a/backend/posix/posix_linux.go +++ b/backend/posix/posix_linux.go @@ -31,11 +31,13 @@ const procfddir = "/proc/self/fd" type tmpfile struct { f *os.File + bucket string objname string isOTmp bool + size int64 } -func openTmpFile(dir, obj string, size int64) (*tmpfile, error) { +func openTmpFile(dir, bucket, obj string, size int64) (*tmpfile, error) { // O_TMPFILE allows for a file handle to an unnamed file in the filesystem. // This can help reduce contention within the namespace (parent directories), // etc. And will auto cleanup the inode on close if we never link this @@ -46,29 +48,32 @@ func openTmpFile(dir, obj string, size int64) (*tmpfile, error) { if err != nil { // O_TMPFILE not supported, try fallback f, err := os.CreateTemp(dir, - fmt.Sprintf("%x\n", sha256.Sum256([]byte(obj)))) + fmt.Sprintf("%x.", sha256.Sum256([]byte(obj)))) if err != nil { return nil, err } - tmp := &tmpfile{f: f} + tmp := &tmpfile{f: f, bucket: bucket, objname: obj, size: size} // falloc is best effort, its fine if this fails if size > 0 { - tmp.falloc(size) + tmp.falloc() } return tmp, nil } + // for O_TMPFILE, filename is /proc/self/fd/ to be used + // later to link file into namespace f := os.NewFile(uintptr(fd), filepath.Join(procfddir, strconv.Itoa(fd))) - tmp := &tmpfile{f: f, isOTmp: true} + + tmp := &tmpfile{f: f, isOTmp: true, size: size} // falloc is best effort, its fine if this fails if size > 0 { - tmp.falloc(size) + tmp.falloc() } return tmp, nil } -func (tmp *tmpfile) falloc(size int64) error { - err := syscall.Fallocate(int(tmp.f.Fd()), 0, 0, size) +func (tmp *tmpfile) falloc() error { + err := syscall.Fallocate(int(tmp.f.Fd()), 0, 0, tmp.size) if err != nil { return fmt.Errorf("fallocate: %v", err) } @@ -82,36 +87,33 @@ func (tmp *tmpfile) link() error { // temp file into place for the object. This ensures the object semantics // of last upload completed wins and is not some combination of writes // from simultaneous uploads. - err := os.Remove(tmp.objname) + objPath := filepath.Join(tmp.bucket, tmp.objname) + err := os.Remove(objPath) if err != nil && !errors.Is(err, fs.ErrNotExist) { return fmt.Errorf("remove stale path: %w", err) } - if tmp.isOTmp { - procdir, err := os.Open(procfddir) - if err != nil { - return fmt.Errorf("open proc dir: %w", err) - } - defer procdir.Close() + if !tmp.isOTmp { + // O_TMPFILE not suported, use fallback + return tmp.fallbackLink() + } - dir, err := os.Open(filepath.Dir(tmp.objname)) - if err != nil { - return fmt.Errorf("open parent dir: %w", err) - } - defer dir.Close() + procdir, err := os.Open(procfddir) + if err != nil { + return fmt.Errorf("open proc dir: %w", err) + } + defer procdir.Close() - err = unix.Linkat(int(procdir.Fd()), filepath.Base(tmp.f.Name()), - int(dir.Fd()), filepath.Base(tmp.objname), unix.AT_SYMLINK_FOLLOW) - if err != nil { - return fmt.Errorf("link tmpfile: %w", err) - } + dir, err := os.Open(filepath.Dir(objPath)) + if err != nil { + return fmt.Errorf("open parent dir: %w", err) + } + defer dir.Close() - err = tmp.f.Close() - if err != nil { - return fmt.Errorf("close tmpfile: %w", err) - } - - return nil + err = unix.Linkat(int(procdir.Fd()), filepath.Base(tmp.f.Name()), + int(dir.Fd()), filepath.Base(objPath), unix.AT_SYMLINK_FOLLOW) + if err != nil { + return fmt.Errorf("link tmpfile: %w", err) } err = tmp.f.Close() @@ -119,7 +121,22 @@ func (tmp *tmpfile) link() error { return fmt.Errorf("close tmpfile: %w", err) } - err = os.Rename(tmp.f.Name(), tmp.objname) + return nil +} + +func (tmp *tmpfile) fallbackLink() error { + tempname := tmp.f.Name() + // cleanup in case anything goes wrong, if rename succeeds then + // this will no longer exist + defer os.Remove(tempname) + + err := tmp.f.Close() + if err != nil { + return fmt.Errorf("close tmpfile: %w", err) + } + + objPath := filepath.Join(tmp.bucket, tmp.objname) + err = os.Rename(tempname, objPath) if err != nil { return fmt.Errorf("rename tmpfile: %w", err) } @@ -128,7 +145,13 @@ func (tmp *tmpfile) link() error { } func (tmp *tmpfile) Write(b []byte) (int, error) { - return tmp.f.Write(b) + if int64(len(b)) > tmp.size { + return 0, fmt.Errorf("write exceeds content length") + } + + n, err := tmp.f.Write(b) + tmp.size -= int64(n) + return n, err } func (tmp *tmpfile) cleanup() { From b8cb3f774d58062429eb23763087896ff2d74ae4 Mon Sep 17 00:00:00 2001 From: Ben McClelland Date: Tue, 30 May 2023 21:45:51 -0700 Subject: [PATCH 2/3] posix: make temp dir if not already exists --- backend/posix/posix_darwin.go | 4 ++++ backend/posix/posix_linux.go | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/backend/posix/posix_darwin.go b/backend/posix/posix_darwin.go index c8afcb0..e67f93b 100644 --- a/backend/posix/posix_darwin.go +++ b/backend/posix/posix_darwin.go @@ -32,6 +32,10 @@ type tmpfile struct { func openTmpFile(dir, bucket, obj string, size int64) (*tmpfile, error) { // Create a temp file for upload while in progress (see link comments below). + err := os.MkdirAll(dir, 0700) + if err != nil { + return nil, fmt.Errorf("make temp dir: %w", err) + } f, err := os.CreateTemp(dir, fmt.Sprintf("%x.", sha256.Sum256([]byte(obj)))) if err != nil { diff --git a/backend/posix/posix_linux.go b/backend/posix/posix_linux.go index de83884..9ce49d3 100644 --- a/backend/posix/posix_linux.go +++ b/backend/posix/posix_linux.go @@ -47,6 +47,10 @@ func openTmpFile(dir, bucket, obj string, size int64) (*tmpfile, error) { fd, err := unix.Open(dir, unix.O_RDWR|unix.O_TMPFILE|unix.O_CLOEXEC, 0666) if err != nil { // O_TMPFILE not supported, try fallback + err := os.MkdirAll(dir, 0700) + if err != nil { + return nil, fmt.Errorf("make temp dir: %w", err) + } f, err := os.CreateTemp(dir, fmt.Sprintf("%x.", sha256.Sum256([]byte(obj)))) if err != nil { From 04ab589aeb29a0e4d6ddd26f6899d71adfbd29e6 Mon Sep 17 00:00:00 2001 From: Ben McClelland Date: Tue, 30 May 2023 21:46:21 -0700 Subject: [PATCH 3/3] posix: fix put object etag --- backend/posix/posix.go | 81 ++++++++++++++++++++++-------------------- 1 file changed, 42 insertions(+), 39 deletions(-) diff --git a/backend/posix/posix.go b/backend/posix/posix.go index fa547c8..0c4aa9b 100644 --- a/backend/posix/posix.go +++ b/backend/posix/posix.go @@ -726,8 +726,6 @@ func (p *Posix) PutObject(po *s3.PutObjectInput) (string, error) { name := filepath.Join(*po.Bucket, *po.Key) - etag := "" - if strings.HasSuffix(*po.Key, "/") { // object is directory err = mkdirAll(name, os.FileMode(0755), *po.Bucket, *po.Key) @@ -741,47 +739,52 @@ func (p *Posix) PutObject(po *s3.PutObjectInput) (string, error) { // set our attribute that this dir was specifically put xattr.Set(name, dirObjKey, nil) - } else { - // object is file - f, err := openTmpFile(filepath.Join(*po.Bucket, metaTmpDir), - *po.Bucket, *po.Key, po.ContentLength) + + // TODO: what etag should be returned here + // and we should set etag xattr to identify dir was + // specifically uploaded + return "", nil + } + + // object is file + f, err := openTmpFile(filepath.Join(*po.Bucket, metaTmpDir), + *po.Bucket, *po.Key, po.ContentLength) + if err != nil { + return "", fmt.Errorf("open temp file: %w", err) + } + defer f.cleanup() + + hash := md5.New() + rdr := io.TeeReader(po.Body, hash) + _, err = io.Copy(f, rdr) + if err != nil { + return "", fmt.Errorf("write object data: %w", err) + } + dir := filepath.Dir(name) + if dir != "" { + err = mkdirAll(dir, os.FileMode(0755), *po.Bucket, *po.Key) if err != nil { - return "", fmt.Errorf("open temp file: %w", err) + return "", fmt.Errorf("make object parent directories: %w", err) } - defer f.cleanup() + } - hash := md5.New() - rdr := io.TeeReader(po.Body, hash) - _, err = io.Copy(f, rdr) + err = f.link() + if err != nil { + return "", fmt.Errorf("link object in namespace: %w", err) + } + + for k, v := range po.Metadata { + xattr.Set(name, "user."+k, []byte(v)) + } + + dataSum := hash.Sum(nil) + etag := hex.EncodeToString(dataSum[:]) + xattr.Set(name, "user.etag", []byte(etag)) + + if newObjUID != 0 || newObjGID != 0 { + err = os.Chown(name, newObjUID, newObjGID) if err != nil { - return "", fmt.Errorf("write object data: %w", err) - } - dir := filepath.Dir(name) - if dir != "" { - err = mkdirAll(dir, os.FileMode(0755), *po.Bucket, *po.Key) - if err != nil { - return "", fmt.Errorf("make object parent directories: %w", err) - } - } - - err = f.link() - if err != nil { - return "", fmt.Errorf("link object in namespace: %w", err) - } - - for k, v := range po.Metadata { - xattr.Set(name, "user."+k, []byte(v)) - } - - dataSum := hash.Sum(nil) - etag := hex.EncodeToString(dataSum[:]) - xattr.Set(name, "user.etag", []byte(etag)) - - if newObjUID != 0 || newObjGID != 0 { - err = os.Chown(name, newObjUID, newObjGID) - if err != nil { - return "", fmt.Errorf("set object uid/gid: %v", err) - } + return "", fmt.Errorf("set object uid/gid: %v", err) } }