Merge pull request #22 from versity/ben/posix

Ben/posix
This commit is contained in:
Ben McClelland
2023-05-24 14:11:17 -07:00
committed by GitHub
3 changed files with 150 additions and 53 deletions

View File

@@ -210,11 +210,11 @@ func (p *Posix) CompleteMultipartUpload(bucket, object, uploadID string, parts [
}
}
f, err := openTmpFile(".")
f, err := openTmpFile(metaTmpDir, object)
if err != nil {
return nil, fmt.Errorf("open temp file: %w", err)
}
defer f.Close()
defer f.cleanup()
for _, p := range parts {
pf, err := os.Open(filepath.Join(objdir, uploadID, fmt.Sprintf("%v", p.PartNumber)))
@@ -244,7 +244,7 @@ func (p *Posix) CompleteMultipartUpload(bucket, object, uploadID string, parts [
}
}
}
err = linkTmpFile(f, objname)
err = f.link()
if err != nil {
return nil, fmt.Errorf("link object in namespace: %w", err)
}
@@ -653,11 +653,15 @@ func (p *Posix) PutObjectPart(bucket, object, uploadID string, part int, length
return "", fmt.Errorf("stat bucket: %w", err)
}
f, err := openTmpFile(".")
sum := sha256.Sum256([]byte(object))
objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum))
partPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", part))
f, err := openTmpFile(objdir, partPath)
if err != nil {
return "", fmt.Errorf("open temp file: %w", err)
}
defer f.Close()
defer f.cleanup()
hash := md5.New()
tr := io.TeeReader(r, hash)
@@ -666,11 +670,7 @@ func (p *Posix) PutObjectPart(bucket, object, uploadID string, part int, length
return "", fmt.Errorf("write part data: %w", err)
}
sum := sha256.Sum256([]byte(object))
objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum))
partPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", part))
err = linkTmpFile(f, partPath)
err = f.link()
if err != nil {
return "", fmt.Errorf("link object in namespace: %w", err)
}
@@ -702,20 +702,19 @@ func (p *Posix) PutObject(po *s3.PutObjectInput) (string, error) {
return "", err
}
// TODO: set user attrs
//for k, v := range metadata {
// xattr.Set(name, "user."+k, []byte(v))
//}
for k, v := range po.Metadata {
xattr.Set(name, "user."+k, []byte(v))
}
// set our tag that this dir was specifically put
xattr.Set(name, dirObjKey, nil)
} else {
// object is file
f, err := openTmpFile(".")
f, err := openTmpFile(metaTmpDir, *po.Key)
if err != nil {
return "", fmt.Errorf("open temp file: %w", err)
}
defer f.Close()
defer f.cleanup()
// TODO: fallocate based on content length
@@ -723,27 +722,24 @@ func (p *Posix) PutObject(po *s3.PutObjectInput) (string, error) {
rdr := io.TeeReader(po.Body, hash)
_, err = io.Copy(f, rdr)
if err != nil {
f.Close()
return "", fmt.Errorf("write object data: %w", err)
}
dir := filepath.Dir(name)
if dir != "" {
err = mkdirAll(dir, os.FileMode(0755), *po.Bucket, *po.Key)
if err != nil {
f.Close()
return "", fmt.Errorf("make object parent directories: %w", err)
}
}
err = linkTmpFile(f, name)
err = f.link()
if err != nil {
return "", fmt.Errorf("link object in namespace: %w", err)
}
// TODO: set user attrs
//for k, v := range metadata {
// xattr.Set(name, "user."+k, []byte(v))
//}
for k, v := range po.Metadata {
xattr.Set(name, "user."+k, []byte(v))
}
dataSum := hash.Sum(nil)
etag := hex.EncodeToString(dataSum[:])
@@ -864,6 +860,12 @@ func (p *Posix) GetObject(bucket, object, acceptRange string, startOffset, lengt
_, contentType, contentEncoding := loadUserMetaData(objPath, userMetaData)
b, err := xattr.Get(objPath, "user.etag")
etag := string(b)
if err != nil {
etag = ""
}
// TODO: fill range request header?
// TODO: parse tags for tag count?
return &s3.GetObjectOutput{
@@ -871,9 +873,9 @@ func (p *Posix) GetObject(bucket, object, acceptRange string, startOffset, lengt
ContentLength: length,
ContentEncoding: &contentEncoding,
ContentType: &contentType,
// ETag: &etag,
LastModified: backend.GetTimePtr(fi.ModTime()),
Metadata: userMetaData,
ETag: &etag,
LastModified: backend.GetTimePtr(fi.ModTime()),
Metadata: userMetaData,
}, nil
}

View File

@@ -1,14 +1,56 @@
package posix
import (
"crypto/sha256"
"errors"
"fmt"
"io/fs"
"os"
)
func openTmpFile(dir string) (*os.File, error) {
return nil, fmt.Errorf("not implemented")
type tmpfile struct {
f *os.File
objname string
}
func linkTmpFile(f *os.File, path string) error {
return fmt.Errorf("not implemented")
func openTmpFile(dir, obj string) (*tmpfile, error) {
// Create a temp file for upload while in progress (see link comments below).
f, err := os.CreateTemp(dir,
fmt.Sprintf("%x\n", sha256.Sum256([]byte(obj))))
if err != nil {
return nil, err
}
return &tmpfile{f: f, objname: obj}, nil
}
func (tmp *tmpfile) link() error {
// We use Rename as the atomic operation for object puts. The upload is
// written to a temp file to not conflict with any other simultaneous
// uploads. The final operation is to move the temp file into place for
// the object. This ensures the object semantics of last upload completed
// wins and is not some combination of writes from simultaneous uploads.
err := os.Remove(tmp.objname)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("remove stale path: %w", err)
}
err = tmp.f.Close()
if err != nil {
return fmt.Errorf("close tmpfile: %w", err)
}
err = os.Rename(tmp.f.Name(), tmp.objname)
if err != nil {
return fmt.Errorf("rename tmpfile: %w", err)
}
return nil
}
func (tmp *tmpfile) Write(b []byte) (int, error) {
return tmp.f.Write(b)
}
func (tmp *tmpfile) cleanup() {
tmp.f.Close()
}

View File

@@ -1,6 +1,7 @@
package posix
import (
"crypto/sha256"
"errors"
"fmt"
"io/fs"
@@ -13,38 +14,90 @@ import (
const procfddir = "/proc/self/fd"
func openTmpFile(dir string) (*os.File, error) {
fd, err := unix.Open(dir, unix.O_RDWR|unix.O_TMPFILE|unix.O_CLOEXEC, 0666)
if err != nil {
return nil, err
}
return os.NewFile(uintptr(fd), filepath.Join(procfddir, strconv.Itoa(fd))), nil
type tmpfile struct {
f *os.File
objname string
isOTmp bool
}
func linkTmpFile(f *os.File, path string) error {
err := os.Remove(path)
func openTmpFile(dir, obj string) (*tmpfile, error) {
// O_TMPFILE allows for a file handle to an unnamed file in the filesystem.
// This can help reduce contention within the namespace (parent directories),
// etc. And will auto cleanup the inode on close if we never link this
// file descriptor into the namespace.
// Not all filesystems support this, so fallback to CreateTemp for when
// this is not supported.
fd, err := unix.Open(dir, unix.O_RDWR|unix.O_TMPFILE|unix.O_CLOEXEC, 0666)
if err != nil {
// O_TMPFILE not supported, try fallback
f, err := os.CreateTemp(dir,
fmt.Sprintf("%x\n", sha256.Sum256([]byte(obj))))
if err != nil {
return nil, err
}
return &tmpfile{f: f}, nil
}
f := os.NewFile(uintptr(fd), filepath.Join(procfddir, strconv.Itoa(fd)))
return &tmpfile{f: f, isOTmp: true}, nil
}
func (tmp *tmpfile) link() error {
// We use Linkat/Rename as the atomic operation for object puts. The
// upload is written to a temp (or unnamed/O_TMPFILE) file to not conflict
// with any other simultaneous uploads. The final operation is to move the
// temp file into place for the object. This ensures the object semantics
// of last upload completed wins and is not some combination of writes
// from simultaneous uploads.
err := os.Remove(tmp.objname)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("remove stale part: %w", err)
return fmt.Errorf("remove stale path: %w", err)
}
procdir, err := os.Open(procfddir)
if err != nil {
return fmt.Errorf("open proc dir: %w", err)
}
defer procdir.Close()
if tmp.isOTmp {
procdir, err := os.Open(procfddir)
if err != nil {
return fmt.Errorf("open proc dir: %w", err)
}
defer procdir.Close()
dir, err := os.Open(filepath.Dir(path))
if err != nil {
return fmt.Errorf("open parent dir: %w", err)
}
defer dir.Close()
dir, err := os.Open(filepath.Dir(tmp.objname))
if err != nil {
return fmt.Errorf("open parent dir: %w", err)
}
defer dir.Close()
err = unix.Linkat(int(procdir.Fd()), filepath.Base(f.Name()),
int(dir.Fd()), filepath.Base(path), unix.AT_SYMLINK_FOLLOW)
err = unix.Linkat(int(procdir.Fd()), filepath.Base(tmp.f.Name()),
int(dir.Fd()), filepath.Base(tmp.objname), unix.AT_SYMLINK_FOLLOW)
if err != nil {
return fmt.Errorf("link tmpfile: %w", err)
}
err = tmp.f.Close()
if err != nil {
return fmt.Errorf("close tmpfile: %w", err)
}
return nil
}
err = tmp.f.Close()
if err != nil {
return fmt.Errorf("link tmpfile: %w", err)
return fmt.Errorf("close tmpfile: %w", err)
}
err = os.Rename(tmp.f.Name(), tmp.objname)
if err != nil {
return fmt.Errorf("rename tmpfile: %w", err)
}
return nil
}
func (tmp *tmpfile) Write(b []byte) (int, error) {
return tmp.f.Write(b)
}
func (tmp *tmpfile) cleanup() {
tmp.f.Close()
}