diff --git a/backend/posix/posix.go b/backend/posix/posix.go index 903911a..4a74c06 100644 --- a/backend/posix/posix.go +++ b/backend/posix/posix.go @@ -210,11 +210,11 @@ func (p *Posix) CompleteMultipartUpload(bucket, object, uploadID string, parts [ } } - f, err := openTmpFile(".") + f, err := openTmpFile(metaTmpDir, object) if err != nil { return nil, fmt.Errorf("open temp file: %w", err) } - defer f.Close() + defer f.cleanup() for _, p := range parts { pf, err := os.Open(filepath.Join(objdir, uploadID, fmt.Sprintf("%v", p.PartNumber))) @@ -244,7 +244,7 @@ func (p *Posix) CompleteMultipartUpload(bucket, object, uploadID string, parts [ } } } - err = linkTmpFile(f, objname) + err = f.link() if err != nil { return nil, fmt.Errorf("link object in namespace: %w", err) } @@ -653,11 +653,15 @@ func (p *Posix) PutObjectPart(bucket, object, uploadID string, part int, length return "", fmt.Errorf("stat bucket: %w", err) } - f, err := openTmpFile(".") + sum := sha256.Sum256([]byte(object)) + objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum)) + partPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", part)) + + f, err := openTmpFile(objdir, partPath) if err != nil { return "", fmt.Errorf("open temp file: %w", err) } - defer f.Close() + defer f.cleanup() hash := md5.New() tr := io.TeeReader(r, hash) @@ -666,11 +670,7 @@ func (p *Posix) PutObjectPart(bucket, object, uploadID string, part int, length return "", fmt.Errorf("write part data: %w", err) } - sum := sha256.Sum256([]byte(object)) - objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum)) - partPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", part)) - - err = linkTmpFile(f, partPath) + err = f.link() if err != nil { return "", fmt.Errorf("link object in namespace: %w", err) } @@ -702,20 +702,19 @@ func (p *Posix) PutObject(po *s3.PutObjectInput) (string, error) { return "", err } - // TODO: set user attrs - //for k, v := range metadata { - // xattr.Set(name, "user."+k, []byte(v)) - //} + for k, v := range po.Metadata { + xattr.Set(name, "user."+k, []byte(v)) + } // set our tag that this dir was specifically put xattr.Set(name, dirObjKey, nil) } else { // object is file - f, err := openTmpFile(".") + f, err := openTmpFile(metaTmpDir, *po.Key) if err != nil { return "", fmt.Errorf("open temp file: %w", err) } - defer f.Close() + defer f.cleanup() // TODO: fallocate based on content length @@ -723,27 +722,24 @@ func (p *Posix) PutObject(po *s3.PutObjectInput) (string, error) { rdr := io.TeeReader(po.Body, hash) _, err = io.Copy(f, rdr) if err != nil { - f.Close() return "", fmt.Errorf("write object data: %w", err) } dir := filepath.Dir(name) if dir != "" { err = mkdirAll(dir, os.FileMode(0755), *po.Bucket, *po.Key) if err != nil { - f.Close() return "", fmt.Errorf("make object parent directories: %w", err) } } - err = linkTmpFile(f, name) + err = f.link() if err != nil { return "", fmt.Errorf("link object in namespace: %w", err) } - // TODO: set user attrs - //for k, v := range metadata { - // xattr.Set(name, "user."+k, []byte(v)) - //} + for k, v := range po.Metadata { + xattr.Set(name, "user."+k, []byte(v)) + } dataSum := hash.Sum(nil) etag := hex.EncodeToString(dataSum[:]) @@ -864,6 +860,12 @@ func (p *Posix) GetObject(bucket, object, acceptRange string, startOffset, lengt _, contentType, contentEncoding := loadUserMetaData(objPath, userMetaData) + b, err := xattr.Get(objPath, "user.etag") + etag := string(b) + if err != nil { + etag = "" + } + // TODO: fill range request header? // TODO: parse tags for tag count? return &s3.GetObjectOutput{ @@ -871,9 +873,9 @@ func (p *Posix) GetObject(bucket, object, acceptRange string, startOffset, lengt ContentLength: length, ContentEncoding: &contentEncoding, ContentType: &contentType, - // ETag: &etag, - LastModified: backend.GetTimePtr(fi.ModTime()), - Metadata: userMetaData, + ETag: &etag, + LastModified: backend.GetTimePtr(fi.ModTime()), + Metadata: userMetaData, }, nil } diff --git a/backend/posix/posix_darwin.go b/backend/posix/posix_darwin.go index 151594c..6a4739e 100644 --- a/backend/posix/posix_darwin.go +++ b/backend/posix/posix_darwin.go @@ -1,14 +1,56 @@ package posix import ( + "crypto/sha256" + "errors" "fmt" + "io/fs" "os" ) -func openTmpFile(dir string) (*os.File, error) { - return nil, fmt.Errorf("not implemented") +type tmpfile struct { + f *os.File + objname string } -func linkTmpFile(f *os.File, path string) error { - return fmt.Errorf("not implemented") +func openTmpFile(dir, obj string) (*tmpfile, error) { + // Create a temp file for upload while in progress (see link comments below). + f, err := os.CreateTemp(dir, + fmt.Sprintf("%x\n", sha256.Sum256([]byte(obj)))) + if err != nil { + return nil, err + } + return &tmpfile{f: f, objname: obj}, nil +} + +func (tmp *tmpfile) link() error { + // We use Rename as the atomic operation for object puts. The upload is + // written to a temp file to not conflict with any other simultaneous + // uploads. The final operation is to move the temp file into place for + // the object. This ensures the object semantics of last upload completed + // wins and is not some combination of writes from simultaneous uploads. + err := os.Remove(tmp.objname) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + return fmt.Errorf("remove stale path: %w", err) + } + + err = tmp.f.Close() + if err != nil { + return fmt.Errorf("close tmpfile: %w", err) + } + + err = os.Rename(tmp.f.Name(), tmp.objname) + if err != nil { + return fmt.Errorf("rename tmpfile: %w", err) + } + + return nil +} + +func (tmp *tmpfile) Write(b []byte) (int, error) { + return tmp.f.Write(b) +} + +func (tmp *tmpfile) cleanup() { + tmp.f.Close() } diff --git a/backend/posix/posix_linux.go b/backend/posix/posix_linux.go index 8dd5bec..123533c 100644 --- a/backend/posix/posix_linux.go +++ b/backend/posix/posix_linux.go @@ -1,6 +1,7 @@ package posix import ( + "crypto/sha256" "errors" "fmt" "io/fs" @@ -13,38 +14,90 @@ import ( const procfddir = "/proc/self/fd" -func openTmpFile(dir string) (*os.File, error) { - fd, err := unix.Open(dir, unix.O_RDWR|unix.O_TMPFILE|unix.O_CLOEXEC, 0666) - if err != nil { - return nil, err - } - - return os.NewFile(uintptr(fd), filepath.Join(procfddir, strconv.Itoa(fd))), nil +type tmpfile struct { + f *os.File + objname string + isOTmp bool } -func linkTmpFile(f *os.File, path string) error { - err := os.Remove(path) +func openTmpFile(dir, obj string) (*tmpfile, error) { + // O_TMPFILE allows for a file handle to an unnamed file in the filesystem. + // This can help reduce contention within the namespace (parent directories), + // etc. And will auto cleanup the inode on close if we never link this + // file descriptor into the namespace. + // Not all filesystems support this, so fallback to CreateTemp for when + // this is not supported. + fd, err := unix.Open(dir, unix.O_RDWR|unix.O_TMPFILE|unix.O_CLOEXEC, 0666) + if err != nil { + // O_TMPFILE not supported, try fallback + f, err := os.CreateTemp(dir, + fmt.Sprintf("%x\n", sha256.Sum256([]byte(obj)))) + if err != nil { + return nil, err + } + return &tmpfile{f: f}, nil + } + + f := os.NewFile(uintptr(fd), filepath.Join(procfddir, strconv.Itoa(fd))) + return &tmpfile{f: f, isOTmp: true}, nil +} + +func (tmp *tmpfile) link() error { + // We use Linkat/Rename as the atomic operation for object puts. The + // upload is written to a temp (or unnamed/O_TMPFILE) file to not conflict + // with any other simultaneous uploads. The final operation is to move the + // temp file into place for the object. This ensures the object semantics + // of last upload completed wins and is not some combination of writes + // from simultaneous uploads. + err := os.Remove(tmp.objname) if err != nil && !errors.Is(err, fs.ErrNotExist) { - return fmt.Errorf("remove stale part: %w", err) + return fmt.Errorf("remove stale path: %w", err) } - procdir, err := os.Open(procfddir) - if err != nil { - return fmt.Errorf("open proc dir: %w", err) - } - defer procdir.Close() + if tmp.isOTmp { + procdir, err := os.Open(procfddir) + if err != nil { + return fmt.Errorf("open proc dir: %w", err) + } + defer procdir.Close() - dir, err := os.Open(filepath.Dir(path)) - if err != nil { - return fmt.Errorf("open parent dir: %w", err) - } - defer dir.Close() + dir, err := os.Open(filepath.Dir(tmp.objname)) + if err != nil { + return fmt.Errorf("open parent dir: %w", err) + } + defer dir.Close() - err = unix.Linkat(int(procdir.Fd()), filepath.Base(f.Name()), - int(dir.Fd()), filepath.Base(path), unix.AT_SYMLINK_FOLLOW) + err = unix.Linkat(int(procdir.Fd()), filepath.Base(tmp.f.Name()), + int(dir.Fd()), filepath.Base(tmp.objname), unix.AT_SYMLINK_FOLLOW) + if err != nil { + return fmt.Errorf("link tmpfile: %w", err) + } + + err = tmp.f.Close() + if err != nil { + return fmt.Errorf("close tmpfile: %w", err) + } + + return nil + } + + err = tmp.f.Close() if err != nil { - return fmt.Errorf("link tmpfile: %w", err) + return fmt.Errorf("close tmpfile: %w", err) + } + + err = os.Rename(tmp.f.Name(), tmp.objname) + if err != nil { + return fmt.Errorf("rename tmpfile: %w", err) } return nil } + +func (tmp *tmpfile) Write(b []byte) (int, error) { + return tmp.f.Write(b) +} + +func (tmp *tmpfile) cleanup() { + tmp.f.Close() +}