From 80f8b1b883596f69aa74ecb4839e91f477923c22 Mon Sep 17 00:00:00 2001 From: Ben McClelland Date: Wed, 17 May 2023 16:00:00 -0700 Subject: [PATCH 1/3] posix: initial bucket requests --- backend/common.go | 14 +++++- backend/posix/posix.go | 96 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 1 deletion(-) diff --git a/backend/common.go b/backend/common.go index d96432d..080ee86 100644 --- a/backend/common.go +++ b/backend/common.go @@ -1,6 +1,10 @@ package backend -import "github.com/aws/aws-sdk-go-v2/service/s3/types" +import ( + "time" + + "github.com/aws/aws-sdk-go-v2/service/s3/types" +) func IsValidBucketName(name string) bool { return true } @@ -9,3 +13,11 @@ type ByBucketName []types.Bucket func (d ByBucketName) Len() int { return len(d) } func (d ByBucketName) Swap(i, j int) { d[i], d[j] = d[j], d[i] } func (d ByBucketName) Less(i, j int) bool { return *d[i].Name < *d[j].Name } + +func GetStringPtr(s string) *string { + return &s +} + +func GetTimePtr(t time.Time) *time.Time { + return &t +} diff --git a/backend/posix/posix.go b/backend/posix/posix.go index 55af7df..d0406a8 100644 --- a/backend/posix/posix.go +++ b/backend/posix/posix.go @@ -1,7 +1,16 @@ package posix import ( + "fmt" + "os" + "path/filepath" + "sort" + "syscall" + + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/versity/scoutgw/backend" + "github.com/versity/scoutgw/s3err" ) type Posix struct { @@ -9,3 +18,90 @@ type Posix struct { } var _ backend.Backend = &Posix{} + +const metaTmpDir = ".sgwtmp" + +func (p *Posix) ListBuckets() (*s3.ListBucketsOutput, error) { + entries, err := os.ReadDir(".") + if err != nil { + return nil, fmt.Errorf("readdir buckets: %w", err) + } + + var buckets []types.Bucket + for _, entry := range entries { + if !entry.IsDir() { + // buckets must be a directory + continue + } + + fi, err := entry.Info() + if err != nil { + // skip entries returning errors + continue + } + + buckets = append(buckets, types.Bucket{ + Name: backend.GetStringPtr(entry.Name()), + CreationDate: backend.GetTimePtr(fi.ModTime()), + }) + } + + sort.Sort(backend.ByBucketName(buckets)) + + return &s3.ListBucketsOutput{ + Buckets: buckets, + }, nil +} + +func (p *Posix) HeadBucket(bucket string) (*s3.HeadBucketOutput, error) { + _, err := os.Lstat(bucket) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket) + } + if err != nil { + return nil, fmt.Errorf("stat bucket: %w", err) + } + + return &s3.HeadBucketOutput{}, nil +} + +func (p *Posix) PutBucket(bucket string) error { + err := os.Mkdir(bucket, 0777) + if err != nil && os.IsExist(err) { + return s3err.GetAPIError(s3err.ErrBucketAlreadyExists) + } + if err != nil { + return fmt.Errorf("mkdir bucket: %w", err) + } + + return nil +} + +func (p *Posix) DeleteBucket(bucket string) error { + names, err := os.ReadDir(bucket) + if err != nil && os.IsNotExist(err) { + return s3err.GetAPIError(s3err.ErrNoSuchBucket) + } + if err != nil { + return fmt.Errorf("readdir bucket: %w", err) + } + + if len(names) == 1 && names[0].Name() == metaTmpDir { + // if .sgwtmp is only item in directory + // then clean this up before trying to remove the bucket + err = os.RemoveAll(filepath.Join(bucket, metaTmpDir)) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("remove temp dir: %w", err) + } + } + + err = os.Remove(bucket) + if err != nil && err.(*os.PathError).Err == syscall.ENOTEMPTY { + return s3err.GetAPIError(s3err.ErrBucketNotEmpty) + } + if err != nil { + return fmt.Errorf("remove bucket: %w", err) + } + + return nil +} From b55f4b79d3a68ae9709e97438cac80f7419f7bde Mon Sep 17 00:00:00 2001 From: Ben McClelland Date: Fri, 19 May 2023 11:11:55 -0700 Subject: [PATCH 2/3] posix: initial mulipart requests --- backend/posix/posix.go | 581 +++++++++++++++++++++++++++++++++- backend/posix/posix_darwin.go | 14 + backend/posix/posix_linux.go | 50 +++ go.mod | 5 +- go.sum | 3 + 5 files changed, 650 insertions(+), 3 deletions(-) create mode 100644 backend/posix/posix_darwin.go create mode 100644 backend/posix/posix_linux.go diff --git a/backend/posix/posix.go b/backend/posix/posix.go index d0406a8..3fb1f12 100644 --- a/backend/posix/posix.go +++ b/backend/posix/posix.go @@ -1,14 +1,22 @@ package posix import ( + "crypto/md5" + "crypto/sha256" + "encoding/hex" "fmt" + "io" "os" "path/filepath" "sort" + "strconv" + "strings" "syscall" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/google/uuid" + "github.com/pkg/xattr" "github.com/versity/scoutgw/backend" "github.com/versity/scoutgw/s3err" ) @@ -19,7 +27,17 @@ type Posix struct { var _ backend.Backend = &Posix{} -const metaTmpDir = ".sgwtmp" +const ( + metaTmpDir = ".sgwtmp" + metaTmpMultipartDir = metaTmpDir + "/multipart" + onameAttr = "user.objname" + tagHdr = "X-Amz-Tagging" +) + +var ( + newObjUID = 0 + newObjGID = 0 +) func (p *Posix) ListBuckets() (*s3.ListBucketsOutput, error) { entries, err := os.ReadDir(".") @@ -105,3 +123,564 @@ func (p *Posix) DeleteBucket(bucket string) error { return nil } + +func (p *Posix) CreateMultipartUpload(mpu *s3.CreateMultipartUploadInput) (*s3.CreateMultipartUploadOutput, error) { + bucket := *mpu.Bucket + object := *mpu.Key + + _, err := os.Stat(bucket) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket) + } + if err != nil { + return nil, fmt.Errorf("stat bucket: %w", err) + } + + // generate random uuid for upload id + uploadID := uuid.New().String() + // hash object name for multipart container + objNameSum := sha256.Sum256([]byte(*mpu.Key)) + // multiple uploads for same object name allowed, + // they will all go into the same hashed name directory + objdir := filepath.Join(bucket, metaTmpMultipartDir, + fmt.Sprintf("%x", objNameSum)) + // the unique upload id is a directory for all of the parts + // associated with this specific multipart upload + err = os.MkdirAll(filepath.Join(objdir, uploadID), 0755) + if err != nil { + return nil, fmt.Errorf("create upload temp dir: %w", err) + } + + // set an xattr with the original object name so that we can + // map the hashed name back to the original object name + err = xattr.Set(objdir, onameAttr, []byte(object)) + if err != nil { + // if we fail, cleanup the container directories + // but ignore errors because there might still be + // other uploads for the same object name outstanding + os.RemoveAll(filepath.Join(objdir, uploadID)) + os.Remove(objdir) + return nil, fmt.Errorf("set name attr for upload: %w", err) + } + + // set user attrs + for k, v := range mpu.Metadata { + xattr.Set(filepath.Join(objdir, uploadID), "user."+k, []byte(v)) + } + + return &s3.CreateMultipartUploadOutput{ + Bucket: &bucket, + Key: &object, + UploadId: &uploadID, + }, nil +} + +func (p *Posix) CompleteMultipartUpload(bucket, object, uploadID string, parts []types.Part) (*s3.CompleteMultipartUploadOutput, error) { + _, err := os.Stat(bucket) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket) + } + if err != nil { + return nil, fmt.Errorf("stat bucket: %w", err) + } + + sum, err := p.checkUploadIDExists(bucket, object, uploadID) + if err != nil { + return nil, err + } + + objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum)) + + // check all parts ok + last := len(parts) - 1 + partsize := int64(0) + for i, p := range parts { + fi, err := os.Lstat(filepath.Join(objdir, uploadID, fmt.Sprintf("%v", p.PartNumber))) + if err != nil { + return nil, s3err.GetAPIError(s3err.ErrInvalidPart) + } + + if i == 0 { + partsize = fi.Size() + } + // all parts except the last need to be the same size + if i < last && partsize != fi.Size() { + return nil, s3err.GetAPIError(s3err.ErrInvalidPart) + } + } + + f, err := openTmpFile(".") + if err != nil { + return nil, fmt.Errorf("open temp file: %w", err) + } + defer f.Close() + + for _, p := range parts { + pf, err := os.Open(filepath.Join(objdir, uploadID, fmt.Sprintf("%v", p.PartNumber))) + if err != nil { + return nil, fmt.Errorf("open part %v: %v", p.PartNumber, err) + } + _, err = io.Copy(f, pf) + pf.Close() + if err != nil { + return nil, fmt.Errorf("copy part %v: %v", p.PartNumber, err) + } + } + + userDefined := make(map[string]string) + upiddir := filepath.Join(objdir, uploadID) + fillUserDefined(upiddir, userDefined) + + b, err := xattr.Get(upiddir, "user."+tagHdr) + tags := string(b) + if err != nil { + tags = "" + } + if tags != "" { + userDefined[tagHdr] = tags + } + + b, err = xattr.Get(upiddir, "user.content-type") + contentType := string(b) + if err != nil { + contentType = "" + } + if contentType != "" { + userDefined["content-type"] = contentType + } + + b, err = xattr.Get(upiddir, "user.content-encoding") + contentEncoding := string(b) + if err != nil { + contentEncoding = "" + } + if contentEncoding != "" { + userDefined["content-encoding"] = contentEncoding + } + + objname := filepath.Join(bucket, object) + dir := filepath.Dir(objname) + if dir != "" { + if err = mkdirAll(dir, os.FileMode(0755), bucket, object); err != nil { + if err != nil && os.IsExist(err) { + return nil, s3err.GetAPIError(s3err.ErrObjectParentIsFile) + } + if err != nil { + return nil, fmt.Errorf("make object parent directories: %w", err) + } + } + } + err = linkTmpFile(f, objname) + if err != nil { + return nil, fmt.Errorf("link object in namespace: %w", err) + } + + for k, v := range userDefined { + err = xattr.Set(objname, "user."+k, []byte(v)) + if err != nil { + // cleanup object if returning error + os.Remove(objname) + return nil, fmt.Errorf("set user attr %q: %w", k, err) + } + } + + // Calculate s3 compatible md5sum for complete multipart. + s3MD5 := getMultipartMD5(parts) + + err = xattr.Set(objname, "user.etag", []byte(s3MD5)) + if err != nil { + // cleanup object if returning error + os.Remove(objname) + return nil, fmt.Errorf("set etag attr: %w", err) + } + + if newObjUID != 0 || newObjGID != 0 { + err = os.Chown(objname, newObjUID, newObjGID) + if err != nil { + // cleanup object if returning error + os.Remove(objname) + return nil, fmt.Errorf("set object uid/gid: %w", err) + } + } + + // cleanup tmp dirs + os.RemoveAll(upiddir) + // use Remove for objdir in case there are still other uploads + // for same object name outstanding + os.Remove(objdir) + + return &s3.CompleteMultipartUploadOutput{ + Bucket: &bucket, + ETag: &s3MD5, + Key: &object, + }, nil +} + +func (p *Posix) checkUploadIDExists(bucket, object, uploadID string) ([32]byte, error) { + sum := sha256.Sum256([]byte(object)) + objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum)) + + _, err := os.Stat(filepath.Join(objdir, uploadID)) + if err != nil && os.IsNotExist(err) { + return [32]byte{}, s3err.GetAPIError(s3err.ErrNoSuchUpload) + } + if err != nil { + return [32]byte{}, fmt.Errorf("stat upload: %w", err) + } + return sum, nil +} + +func fillUserDefined(path string, m map[string]string) { + ents, err := xattr.List(path) + if err != nil || len(ents) == 0 { + return + } + for _, e := range ents { + if !isValidMeta(e) { + continue + } + b, err := xattr.Get(path, e) + if err == syscall.ENODATA { + m[strings.TrimPrefix(e, "user.")] = "" + continue + } + if err != nil { + continue + } + m[strings.TrimPrefix(e, "user.")] = string(b) + } +} + +func isValidMeta(val string) bool { + if strings.HasPrefix(val, "user.X-Amz-Meta") { + return true + } + if strings.HasPrefix(val, "user.x-amz-meta") { + return true + } + if strings.EqualFold(val, "user.Expires") { + return true + } + if strings.EqualFold(val, "user.expires") { + return true + } + return false +} + +// mkdirAll is similar to os.MkdirAll but it will also set uid/gid when +// making new directories +func mkdirAll(path string, perm os.FileMode, bucket, object string) error { + // Fast path: if we can tell whether path is a directory or file, stop with success or error. + dir, err := os.Stat(path) + if err == nil { + if dir.IsDir() { + return nil + } + return &os.PathError{Op: "mkdir", Path: path, Err: syscall.ENOTDIR} + } + + // Slow path: make sure parent exists and then call Mkdir for path. + i := len(path) + for i > 0 && os.IsPathSeparator(path[i-1]) { // Skip trailing path separator. + i-- + } + + j := i + for j > 0 && !os.IsPathSeparator(path[j-1]) { // Scan backward over element. + j-- + } + + if j > 1 { + // Create parent. + err = mkdirAll(path[:j-1], perm, bucket, object) + if err != nil { + return err + } + } + + // Parent now exists; invoke Mkdir and use its result. + err = os.Mkdir(path, perm) + if err != nil { + // Handle arguments like "foo/." by + // double-checking that directory doesn't exist. + dir, err1 := os.Lstat(path) + if err1 == nil && dir.IsDir() { + return nil + } + return s3err.GetAPIError(s3err.ErrObjectParentIsFile) + } + if newObjUID != 0 || newObjGID != 0 { + err = os.Chown(path, newObjUID, newObjGID) + if err != nil { + return fmt.Errorf("set parent ownership: %w", err) + } + } + return nil +} + +func getMultipartMD5(parts []types.Part) string { + var partsEtagBytes []byte + for _, part := range parts { + partsEtagBytes = append(partsEtagBytes, getEtagBytes(*part.ETag)...) + } + s3MD5 := fmt.Sprintf("%s-%d", md5String(partsEtagBytes), len(parts)) + return s3MD5 +} + +func getEtagBytes(etag string) []byte { + decode, err := hex.DecodeString(strings.ReplaceAll(etag, string('"'), "")) + if err != nil { + return []byte(etag) + } + return decode +} + +func md5String(data []byte) string { + sum := md5.Sum(data) + return hex.EncodeToString(sum[:]) +} + +func (p *Posix) AbortMultipartUpload(mpu *s3.AbortMultipartUploadInput) error { + bucket := *mpu.Bucket + object := *mpu.Key + uploadID := *mpu.UploadId + + _, err := os.Stat(bucket) + if err != nil && os.IsNotExist(err) { + return s3err.GetAPIError(s3err.ErrNoSuchBucket) + } + if err != nil { + return fmt.Errorf("stat bucket: %w", err) + } + + sum := sha256.Sum256([]byte(object)) + objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum)) + + _, err = os.Stat(filepath.Join(objdir, uploadID)) + if err != nil { + return s3err.GetAPIError(s3err.ErrNoSuchUpload) + } + + err = os.RemoveAll(filepath.Join(objdir, uploadID)) + if err != nil { + return fmt.Errorf("remove multipart upload container: %w", err) + } + os.Remove(objdir) + + return nil +} + +func (p *Posix) ListMultipartUploads(mpu *s3.ListMultipartUploadsInput) (*s3.ListMultipartUploadsOutput, error) { + bucket := *mpu.Bucket + + _, err := os.Stat(bucket) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket) + } + if err != nil { + return nil, fmt.Errorf("stat bucket: %w", err) + } + + // ignore readdir error and use the empty list returned + objs, _ := os.ReadDir(filepath.Join(bucket, metaTmpMultipartDir)) + + var uploads []types.MultipartUpload + + keyMarker := *mpu.KeyMarker + uploadIDMarker := *mpu.UploadIdMarker + var pastMarker bool + if keyMarker == "" && uploadIDMarker == "" { + pastMarker = true + } + + for i, obj := range objs { + if !obj.IsDir() { + continue + } + + b, err := xattr.Get(filepath.Join(bucket, metaTmpMultipartDir, obj.Name()), onameAttr) + if err != nil { + continue + } + objectName := string(b) + if !strings.HasPrefix(objectName, *mpu.Prefix) { + continue + } + + upids, err := os.ReadDir(filepath.Join(bucket, metaTmpMultipartDir, obj.Name())) + if err != nil { + continue + } + + for j, upid := range upids { + if !upid.IsDir() { + continue + } + + if objectName == keyMarker || upid.Name() == uploadIDMarker { + pastMarker = true + continue + } + if keyMarker != "" && uploadIDMarker != "" && !pastMarker { + continue + } + + userDefined := make(map[string]string) + upiddir := filepath.Join(bucket, metaTmpMultipartDir, obj.Name(), upid.Name()) + fillUserDefined(upiddir, userDefined) + + uploadID := upid.Name() + uploads = append(uploads, types.MultipartUpload{ + Key: &objectName, + UploadId: &uploadID, + }) + if len(uploads) == int(mpu.MaxUploads) { + return &s3.ListMultipartUploadsOutput{ + Bucket: &bucket, + Delimiter: mpu.Delimiter, + IsTruncated: i != len(objs) || j != len(upids), + KeyMarker: &keyMarker, + MaxUploads: mpu.MaxUploads, + NextKeyMarker: &objectName, + NextUploadIdMarker: &uploadID, + Prefix: mpu.Prefix, + UploadIdMarker: mpu.UploadIdMarker, + Uploads: uploads, + }, nil + } + } + } + + return &s3.ListMultipartUploadsOutput{ + Bucket: &bucket, + Delimiter: mpu.Delimiter, + KeyMarker: &keyMarker, + MaxUploads: mpu.MaxUploads, + Prefix: mpu.Prefix, + UploadIdMarker: mpu.UploadIdMarker, + Uploads: uploads, + }, nil +} + +func (p *Posix) ListObjectParts(bucket, object, uploadID string, partNumberMarker int, maxParts int) (*s3.ListPartsOutput, error) { + _, err := os.Stat(bucket) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket) + } + if err != nil { + return nil, fmt.Errorf("stat bucket: %w", err) + } + + sum, err := p.checkUploadIDExists(bucket, object, uploadID) + if err != nil { + return nil, err + } + + objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum)) + + ents, err := os.ReadDir(filepath.Join(objdir, uploadID)) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchUpload) + } + if err != nil { + return nil, fmt.Errorf("readdir upload: %w", err) + } + + var parts []types.Part + for _, e := range ents { + pn, _ := strconv.Atoi(e.Name()) + if pn <= partNumberMarker { + continue + } + + partPath := filepath.Join(objdir, uploadID, e.Name()) + b, err := xattr.Get(partPath, "user.etag") + etag := string(b) + if err != nil { + etag = "" + } + + fi, err := os.Lstat(partPath) + if err != nil { + continue + } + + parts = append(parts, types.Part{ + PartNumber: int32(pn), + ETag: &etag, + LastModified: backend.GetTimePtr(fi.ModTime()), + Size: fi.Size(), + }) + } + + sort.Slice(parts, + func(i int, j int) bool { return parts[i].PartNumber < parts[j].PartNumber }) + + oldLen := len(parts) + if len(parts) > maxParts { + parts = parts[:maxParts] + } + newLen := len(parts) + + nextpart := int32(0) + if len(parts) != 0 { + nextpart = parts[len(parts)-1].PartNumber + } + + userDefined := make(map[string]string) + upiddir := filepath.Join(objdir, uploadID) + fillUserDefined(upiddir, userDefined) + + return &s3.ListPartsOutput{ + Bucket: &bucket, + IsTruncated: oldLen != newLen, + Key: &object, + MaxParts: int32(maxParts), + NextPartNumberMarker: backend.GetStringPtr(fmt.Sprintf("%v", nextpart)), + PartNumberMarker: backend.GetStringPtr(fmt.Sprintf("%v", partNumberMarker)), + Parts: parts, + UploadId: &uploadID, + }, nil +} + +// TODO: copy part +// func (p *Posix) CopyPart(srcBucket, srcObject, DstBucket, uploadID, rangeHeader string, part int) (*types.CopyPartResult, error) { +// } + +func (p *Posix) PutObjectPart(bucket, object, uploadID string, part int, r io.Reader) (string, error) { + _, err := os.Stat(bucket) + if err != nil && os.IsNotExist(err) { + return "", s3err.GetAPIError(s3err.ErrNoSuchBucket) + } + if err != nil { + return "", fmt.Errorf("stat bucket: %w", err) + } + + f, err := openTmpFile(".") + if err != nil { + return "", fmt.Errorf("open temp file: %w", err) + } + defer f.Close() + + hash := md5.New() + tr := io.TeeReader(r, hash) + _, err = io.Copy(f, tr) + if err != nil { + return "", fmt.Errorf("write part data: %w", err) + } + + sum := sha256.Sum256([]byte(object)) + objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum)) + partPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", part)) + + err = linkTmpFile(f, partPath) + if err != nil { + return "", fmt.Errorf("link object in namespace: %w", err) + } + + dataSum := hash.Sum(nil) + etag := hex.EncodeToString(dataSum[:]) + xattr.Set(partPath, "user.etag", []byte(etag)) + + return etag, nil +} diff --git a/backend/posix/posix_darwin.go b/backend/posix/posix_darwin.go new file mode 100644 index 0000000..151594c --- /dev/null +++ b/backend/posix/posix_darwin.go @@ -0,0 +1,14 @@ +package posix + +import ( + "fmt" + "os" +) + +func openTmpFile(dir string) (*os.File, error) { + return nil, fmt.Errorf("not implemented") +} + +func linkTmpFile(f *os.File, path string) error { + return fmt.Errorf("not implemented") +} diff --git a/backend/posix/posix_linux.go b/backend/posix/posix_linux.go new file mode 100644 index 0000000..8dd5bec --- /dev/null +++ b/backend/posix/posix_linux.go @@ -0,0 +1,50 @@ +package posix + +import ( + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + "strconv" + + "golang.org/x/sys/unix" +) + +const procfddir = "/proc/self/fd" + +func openTmpFile(dir string) (*os.File, error) { + fd, err := unix.Open(dir, unix.O_RDWR|unix.O_TMPFILE|unix.O_CLOEXEC, 0666) + if err != nil { + return nil, err + } + + return os.NewFile(uintptr(fd), filepath.Join(procfddir, strconv.Itoa(fd))), nil +} + +func linkTmpFile(f *os.File, path string) error { + err := os.Remove(path) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + return fmt.Errorf("remove stale part: %w", err) + } + + procdir, err := os.Open(procfddir) + if err != nil { + return fmt.Errorf("open proc dir: %w", err) + } + defer procdir.Close() + + dir, err := os.Open(filepath.Dir(path)) + if err != nil { + return fmt.Errorf("open parent dir: %w", err) + } + defer dir.Close() + + err = unix.Linkat(int(procdir.Fd()), filepath.Base(f.Name()), + int(dir.Fd()), filepath.Base(path), unix.AT_SYMLINK_FOLLOW) + if err != nil { + return fmt.Errorf("link tmpfile: %w", err) + } + + return nil +} diff --git a/go.mod b/go.mod index 333440b..7f851aa 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,9 @@ require ( github.com/aws/aws-sdk-go-v2 v1.18.0 github.com/aws/aws-sdk-go-v2/service/s3 v1.33.1 github.com/gofiber/fiber/v2 v2.45.0 + github.com/google/uuid v1.3.0 + github.com/pkg/xattr v0.4.9 + golang.org/x/sys v0.8.0 ) require ( @@ -19,7 +22,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.14.2 // indirect github.com/aws/smithy-go v1.13.5 // indirect - github.com/google/uuid v1.3.0 // indirect github.com/klauspost/compress v1.16.5 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.18 // indirect @@ -32,5 +34,4 @@ require ( github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.47.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect - golang.org/x/sys v0.8.0 // indirect ) diff --git a/go.sum b/go.sum index 90e3ec6..6ab0e03 100644 --- a/go.sum +++ b/go.sum @@ -43,6 +43,8 @@ github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/philhofer/fwd v1.1.2 h1:bnDivRJ1EWPjUIRXV5KfORO897HTbpFAQddBdE8t7Gw= github.com/philhofer/fwd v1.1.2/go.mod h1:qkPdfjR2SIEbspLqpe1tO4n5yICnr2DY7mqEx2tUTP0= +github.com/pkg/xattr v0.4.9 h1:5883YPCtkSd8LFbs13nXplj9g9tlrwoJRjgpgMu1/fE= +github.com/pkg/xattr v0.4.9/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= @@ -86,6 +88,7 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= From 3559592fcd19700a02a73d45ab03bcfaf3709ace Mon Sep 17 00:00:00 2001 From: Ben McClelland Date: Fri, 19 May 2023 19:19:42 -0700 Subject: [PATCH 3/3] posix: initial object requests --- backend/posix/posix.go | 358 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 317 insertions(+), 41 deletions(-) diff --git a/backend/posix/posix.go b/backend/posix/posix.go index 3fb1f12..15c28e0 100644 --- a/backend/posix/posix.go +++ b/backend/posix/posix.go @@ -32,6 +32,7 @@ const ( metaTmpMultipartDir = metaTmpDir + "/multipart" onameAttr = "user.objname" tagHdr = "X-Amz-Tagging" + dirObjKey = "user.dirisobject" ) var ( @@ -227,36 +228,9 @@ func (p *Posix) CompleteMultipartUpload(bucket, object, uploadID string, parts [ } } - userDefined := make(map[string]string) + userMetaData := make(map[string]string) upiddir := filepath.Join(objdir, uploadID) - fillUserDefined(upiddir, userDefined) - - b, err := xattr.Get(upiddir, "user."+tagHdr) - tags := string(b) - if err != nil { - tags = "" - } - if tags != "" { - userDefined[tagHdr] = tags - } - - b, err = xattr.Get(upiddir, "user.content-type") - contentType := string(b) - if err != nil { - contentType = "" - } - if contentType != "" { - userDefined["content-type"] = contentType - } - - b, err = xattr.Get(upiddir, "user.content-encoding") - contentEncoding := string(b) - if err != nil { - contentEncoding = "" - } - if contentEncoding != "" { - userDefined["content-encoding"] = contentEncoding - } + loadUserMetaData(upiddir, userMetaData) objname := filepath.Join(bucket, object) dir := filepath.Dir(objname) @@ -275,7 +249,7 @@ func (p *Posix) CompleteMultipartUpload(bucket, object, uploadID string, parts [ return nil, fmt.Errorf("link object in namespace: %w", err) } - for k, v := range userDefined { + for k, v := range userMetaData { err = xattr.Set(objname, "user."+k, []byte(v)) if err != nil { // cleanup object if returning error @@ -330,7 +304,7 @@ func (p *Posix) checkUploadIDExists(bucket, object, uploadID string) ([32]byte, return sum, nil } -func fillUserDefined(path string, m map[string]string) { +func loadUserMetaData(path string, m map[string]string) (tag, contentType, contentEncoding string) { ents, err := xattr.List(path) if err != nil || len(ents) == 0 { return @@ -349,21 +323,44 @@ func fillUserDefined(path string, m map[string]string) { } m[strings.TrimPrefix(e, "user.")] = string(b) } + + b, err := xattr.Get(path, "user."+tagHdr) + tag = string(b) + if err != nil { + tag = "" + } + if tag != "" { + m[tagHdr] = tag + } + + b, err = xattr.Get(path, "user.content-type") + contentType = string(b) + if err != nil { + contentType = "" + } + if contentType != "" { + m["content-type"] = contentType + } + + b, err = xattr.Get(path, "user.content-encoding") + contentEncoding = string(b) + if err != nil { + contentEncoding = "" + } + if contentEncoding != "" { + m["content-encoding"] = contentEncoding + } + + return } func isValidMeta(val string) bool { if strings.HasPrefix(val, "user.X-Amz-Meta") { return true } - if strings.HasPrefix(val, "user.x-amz-meta") { - return true - } if strings.EqualFold(val, "user.Expires") { return true } - if strings.EqualFold(val, "user.expires") { - return true - } return false } @@ -525,9 +522,9 @@ func (p *Posix) ListMultipartUploads(mpu *s3.ListMultipartUploadsInput) (*s3.Lis continue } - userDefined := make(map[string]string) + userMetaData := make(map[string]string) upiddir := filepath.Join(bucket, metaTmpMultipartDir, obj.Name(), upid.Name()) - fillUserDefined(upiddir, userDefined) + loadUserMetaData(upiddir, userMetaData) uploadID := upid.Name() uploads = append(uploads, types.MultipartUpload{ @@ -627,9 +624,9 @@ func (p *Posix) ListObjectParts(bucket, object, uploadID string, partNumberMarke nextpart = parts[len(parts)-1].PartNumber } - userDefined := make(map[string]string) + userMetaData := make(map[string]string) upiddir := filepath.Join(objdir, uploadID) - fillUserDefined(upiddir, userDefined) + loadUserMetaData(upiddir, userMetaData) return &s3.ListPartsOutput{ Bucket: &bucket, @@ -684,3 +681,282 @@ func (p *Posix) PutObjectPart(bucket, object, uploadID string, part int, r io.Re return etag, nil } + +func (p *Posix) PutObject(bucket, object string, r io.Reader) (string, error) { + _, err := os.Stat(bucket) + if err != nil && os.IsNotExist(err) { + return "", s3err.GetAPIError(s3err.ErrNoSuchBucket) + } + if err != nil { + return "", fmt.Errorf("stat bucket: %w", err) + } + + name := filepath.Join(bucket, object) + + etag := "" + + if strings.HasSuffix(object, "/") { + // object is directory + err = mkdirAll(name, os.FileMode(0755), bucket, object) + if err != nil { + return "", err + } + + // TODO: set user attrs + //for k, v := range metadata { + // xattr.Set(name, "user."+k, []byte(v)) + //} + + // set our tag that this dir was specifically put + xattr.Set(name, dirObjKey, nil) + } else { + // object is file + f, err := openTmpFile(".") + if err != nil { + return "", fmt.Errorf("open temp file: %w", err) + } + defer f.Close() + + // TODO: fallocate based on content length + + hash := md5.New() + rdr := io.TeeReader(r, hash) + _, err = io.Copy(f, rdr) + if err != nil { + f.Close() + return "", fmt.Errorf("write object data: %w", err) + } + dir := filepath.Dir(name) + if dir != "" { + err = mkdirAll(dir, os.FileMode(0755), bucket, object) + if err != nil { + f.Close() + return "", fmt.Errorf("make object parent directories: %w", err) + } + } + + err = linkTmpFile(f, name) + if err != nil { + return "", fmt.Errorf("link object in namespace: %w", err) + } + + // TODO: set user attrs + //for k, v := range metadata { + // xattr.Set(name, "user."+k, []byte(v)) + //} + + dataSum := hash.Sum(nil) + etag := hex.EncodeToString(dataSum[:]) + xattr.Set(name, "user.etag", []byte(etag)) + + if newObjUID != 0 || newObjGID != 0 { + err = os.Chown(name, newObjUID, newObjGID) + if err != nil { + return "", fmt.Errorf("set object uid/gid: %v", err) + } + } + } + + return etag, nil +} + +func (p *Posix) DeleteObject(bucket, object string) error { + _, err := os.Stat(bucket) + if err != nil && os.IsNotExist(err) { + return s3err.GetAPIError(s3err.ErrNoSuchBucket) + } + if err != nil { + return fmt.Errorf("stat bucket: %w", err) + } + + os.Remove(filepath.Join(bucket, object)) + if err != nil && os.IsNotExist(err) { + return s3err.GetAPIError(s3err.ErrNoSuchKey) + } + if err != nil { + return fmt.Errorf("delete object: %w", err) + } + + return p.removeParents(bucket, object) +} + +func (p *Posix) removeParents(bucket, object string) error { + // this will remove all parent directories that were not + // specifically uploaded with a put object. we detect + // this with a special xattr to indicate these. stop + // at either the bucket or the first parent we encounter + // with the xattr, whichever comes first. + objPath := filepath.Join(bucket, object) + + for { + parent := filepath.Dir(objPath) + + if filepath.Base(parent) == bucket { + break + } + + _, err := xattr.Get(parent, dirObjKey) + if err == nil { + break + } + + err = os.Remove(parent) + if err != nil { + break + } + + objPath = parent + } + return nil +} + +func (p *Posix) DeleteObjects(bucket string, objects *s3.DeleteObjectsInput) error { + // delete object already checks bucket + for _, obj := range objects.Delete.Objects { + err := p.DeleteObject(bucket, *obj.Key) + if err != nil { + return err + } + } + + return nil +} + +func (p *Posix) GetObject(bucket, object string, startOffset, length int64, writer io.Writer, etag string) (*s3.GetObjectOutput, error) { + _, err := os.Stat(bucket) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket) + } + if err != nil { + return nil, fmt.Errorf("stat bucket: %w", err) + } + + objPath := filepath.Join(bucket, object) + fi, err := os.Stat(objPath) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchKey) + } + if err != nil { + return nil, fmt.Errorf("stat object: %w", err) + } + + if startOffset+length > fi.Size() { + // TODO: is ErrInvalidRequest correct here? + return nil, s3err.GetAPIError(s3err.ErrInvalidRequest) + } + + f, err := os.Open(objPath) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchKey) + } + if err != nil { + return nil, fmt.Errorf("open object: %w", err) + } + defer f.Close() + + rdr := io.NewSectionReader(f, startOffset, length) + _, err = io.Copy(writer, rdr) + if err != nil { + return nil, fmt.Errorf("copy data: %w", err) + } + + userMetaData := make(map[string]string) + + _, contentType, contentEncoding := loadUserMetaData(objPath, userMetaData) + + // TODO: fill range request header? + // TODO: parse tags for tag count? + return &s3.GetObjectOutput{ + ContentLength: length, + ContentEncoding: &contentEncoding, + ContentType: &contentType, + ETag: &etag, + LastModified: backend.GetTimePtr(fi.ModTime()), + Metadata: userMetaData, + }, nil +} + +func (p *Posix) HeadObject(bucket, object string, etag string) (*s3.HeadObjectOutput, error) { + _, err := os.Stat(bucket) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket) + } + if err != nil { + return nil, fmt.Errorf("stat bucket: %w", err) + } + + objPath := filepath.Join(bucket, object) + fi, err := os.Stat(objPath) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchKey) + } + if err != nil { + return nil, fmt.Errorf("stat object: %w", err) + } + + userMetaData := make(map[string]string) + _, contentType, contentEncoding := loadUserMetaData(filepath.Join(bucket, objPath), userMetaData) + + // TODO: fill accept ranges request header? + // TODO: do we need to get etag from xattr? + return &s3.HeadObjectOutput{ + ContentLength: fi.Size(), + ContentType: &contentType, + ContentEncoding: &contentEncoding, + ETag: &etag, + LastModified: backend.GetTimePtr(fi.ModTime()), + Metadata: userMetaData, + }, nil +} + +func (p *Posix) CopyObject(srcBucket, srcObject, DstBucket, dstObject string) (*s3.CopyObjectOutput, error) { + _, err := os.Stat(srcBucket) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket) + } + if err != nil { + return nil, fmt.Errorf("stat bucket: %w", err) + } + + _, err = os.Stat(DstBucket) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket) + } + if err != nil { + return nil, fmt.Errorf("stat bucket: %w", err) + } + + objPath := filepath.Join(srcBucket, srcObject) + f, err := os.Open(objPath) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchKey) + } + if err != nil { + return nil, fmt.Errorf("stat object: %w", err) + } + defer f.Close() + + etag, err := p.PutObject(DstBucket, dstObject, f) + if err != nil { + return nil, err + } + + fi, err := os.Stat(filepath.Join(DstBucket, dstObject)) + if err != nil { + return nil, fmt.Errorf("stat dst object: %w", err) + } + + return &s3.CopyObjectOutput{ + CopyObjectResult: &types.CopyObjectResult{ + ETag: &etag, + LastModified: backend.GetTimePtr(fi.ModTime()), + }, + }, nil +} + +func (p *Posix) ListObjects(bucket, prefix, marker, delim string, maxkeys int) (*s3.ListBucketsOutput, error) { + return nil, s3err.GetAPIError(s3err.ErrNotImplemented) +} +func (p *Posix) ListObjectsV2(bucket, prefix, marker, delim string, maxkeys int) (*s3.ListBucketsOutput, error) { + return nil, s3err.GetAPIError(s3err.ErrNotImplemented) +}