diff --git a/backend/posix/posix.go b/backend/posix/posix.go index 3fb1f12..15c28e0 100644 --- a/backend/posix/posix.go +++ b/backend/posix/posix.go @@ -32,6 +32,7 @@ const ( metaTmpMultipartDir = metaTmpDir + "/multipart" onameAttr = "user.objname" tagHdr = "X-Amz-Tagging" + dirObjKey = "user.dirisobject" ) var ( @@ -227,36 +228,9 @@ func (p *Posix) CompleteMultipartUpload(bucket, object, uploadID string, parts [ } } - userDefined := make(map[string]string) + userMetaData := make(map[string]string) upiddir := filepath.Join(objdir, uploadID) - fillUserDefined(upiddir, userDefined) - - b, err := xattr.Get(upiddir, "user."+tagHdr) - tags := string(b) - if err != nil { - tags = "" - } - if tags != "" { - userDefined[tagHdr] = tags - } - - b, err = xattr.Get(upiddir, "user.content-type") - contentType := string(b) - if err != nil { - contentType = "" - } - if contentType != "" { - userDefined["content-type"] = contentType - } - - b, err = xattr.Get(upiddir, "user.content-encoding") - contentEncoding := string(b) - if err != nil { - contentEncoding = "" - } - if contentEncoding != "" { - userDefined["content-encoding"] = contentEncoding - } + loadUserMetaData(upiddir, userMetaData) objname := filepath.Join(bucket, object) dir := filepath.Dir(objname) @@ -275,7 +249,7 @@ func (p *Posix) CompleteMultipartUpload(bucket, object, uploadID string, parts [ return nil, fmt.Errorf("link object in namespace: %w", err) } - for k, v := range userDefined { + for k, v := range userMetaData { err = xattr.Set(objname, "user."+k, []byte(v)) if err != nil { // cleanup object if returning error @@ -330,7 +304,7 @@ func (p *Posix) checkUploadIDExists(bucket, object, uploadID string) ([32]byte, return sum, nil } -func fillUserDefined(path string, m map[string]string) { +func loadUserMetaData(path string, m map[string]string) (tag, contentType, contentEncoding string) { ents, err := xattr.List(path) if err != nil || len(ents) == 0 { return @@ -349,21 +323,44 @@ func fillUserDefined(path string, m map[string]string) { } m[strings.TrimPrefix(e, "user.")] = string(b) } + + b, err := xattr.Get(path, "user."+tagHdr) + tag = string(b) + if err != nil { + tag = "" + } + if tag != "" { + m[tagHdr] = tag + } + + b, err = xattr.Get(path, "user.content-type") + contentType = string(b) + if err != nil { + contentType = "" + } + if contentType != "" { + m["content-type"] = contentType + } + + b, err = xattr.Get(path, "user.content-encoding") + contentEncoding = string(b) + if err != nil { + contentEncoding = "" + } + if contentEncoding != "" { + m["content-encoding"] = contentEncoding + } + + return } func isValidMeta(val string) bool { if strings.HasPrefix(val, "user.X-Amz-Meta") { return true } - if strings.HasPrefix(val, "user.x-amz-meta") { - return true - } if strings.EqualFold(val, "user.Expires") { return true } - if strings.EqualFold(val, "user.expires") { - return true - } return false } @@ -525,9 +522,9 @@ func (p *Posix) ListMultipartUploads(mpu *s3.ListMultipartUploadsInput) (*s3.Lis continue } - userDefined := make(map[string]string) + userMetaData := make(map[string]string) upiddir := filepath.Join(bucket, metaTmpMultipartDir, obj.Name(), upid.Name()) - fillUserDefined(upiddir, userDefined) + loadUserMetaData(upiddir, userMetaData) uploadID := upid.Name() uploads = append(uploads, types.MultipartUpload{ @@ -627,9 +624,9 @@ func (p *Posix) ListObjectParts(bucket, object, uploadID string, partNumberMarke nextpart = parts[len(parts)-1].PartNumber } - userDefined := make(map[string]string) + userMetaData := make(map[string]string) upiddir := filepath.Join(objdir, uploadID) - fillUserDefined(upiddir, userDefined) + loadUserMetaData(upiddir, userMetaData) return &s3.ListPartsOutput{ Bucket: &bucket, @@ -684,3 +681,282 @@ func (p *Posix) PutObjectPart(bucket, object, uploadID string, part int, r io.Re return etag, nil } + +func (p *Posix) PutObject(bucket, object string, r io.Reader) (string, error) { + _, err := os.Stat(bucket) + if err != nil && os.IsNotExist(err) { + return "", s3err.GetAPIError(s3err.ErrNoSuchBucket) + } + if err != nil { + return "", fmt.Errorf("stat bucket: %w", err) + } + + name := filepath.Join(bucket, object) + + etag := "" + + if strings.HasSuffix(object, "/") { + // object is directory + err = mkdirAll(name, os.FileMode(0755), bucket, object) + if err != nil { + return "", err + } + + // TODO: set user attrs + //for k, v := range metadata { + // xattr.Set(name, "user."+k, []byte(v)) + //} + + // set our tag that this dir was specifically put + xattr.Set(name, dirObjKey, nil) + } else { + // object is file + f, err := openTmpFile(".") + if err != nil { + return "", fmt.Errorf("open temp file: %w", err) + } + defer f.Close() + + // TODO: fallocate based on content length + + hash := md5.New() + rdr := io.TeeReader(r, hash) + _, err = io.Copy(f, rdr) + if err != nil { + f.Close() + return "", fmt.Errorf("write object data: %w", err) + } + dir := filepath.Dir(name) + if dir != "" { + err = mkdirAll(dir, os.FileMode(0755), bucket, object) + if err != nil { + f.Close() + return "", fmt.Errorf("make object parent directories: %w", err) + } + } + + err = linkTmpFile(f, name) + if err != nil { + return "", fmt.Errorf("link object in namespace: %w", err) + } + + // TODO: set user attrs + //for k, v := range metadata { + // xattr.Set(name, "user."+k, []byte(v)) + //} + + dataSum := hash.Sum(nil) + etag := hex.EncodeToString(dataSum[:]) + xattr.Set(name, "user.etag", []byte(etag)) + + if newObjUID != 0 || newObjGID != 0 { + err = os.Chown(name, newObjUID, newObjGID) + if err != nil { + return "", fmt.Errorf("set object uid/gid: %v", err) + } + } + } + + return etag, nil +} + +func (p *Posix) DeleteObject(bucket, object string) error { + _, err := os.Stat(bucket) + if err != nil && os.IsNotExist(err) { + return s3err.GetAPIError(s3err.ErrNoSuchBucket) + } + if err != nil { + return fmt.Errorf("stat bucket: %w", err) + } + + os.Remove(filepath.Join(bucket, object)) + if err != nil && os.IsNotExist(err) { + return s3err.GetAPIError(s3err.ErrNoSuchKey) + } + if err != nil { + return fmt.Errorf("delete object: %w", err) + } + + return p.removeParents(bucket, object) +} + +func (p *Posix) removeParents(bucket, object string) error { + // this will remove all parent directories that were not + // specifically uploaded with a put object. we detect + // this with a special xattr to indicate these. stop + // at either the bucket or the first parent we encounter + // with the xattr, whichever comes first. + objPath := filepath.Join(bucket, object) + + for { + parent := filepath.Dir(objPath) + + if filepath.Base(parent) == bucket { + break + } + + _, err := xattr.Get(parent, dirObjKey) + if err == nil { + break + } + + err = os.Remove(parent) + if err != nil { + break + } + + objPath = parent + } + return nil +} + +func (p *Posix) DeleteObjects(bucket string, objects *s3.DeleteObjectsInput) error { + // delete object already checks bucket + for _, obj := range objects.Delete.Objects { + err := p.DeleteObject(bucket, *obj.Key) + if err != nil { + return err + } + } + + return nil +} + +func (p *Posix) GetObject(bucket, object string, startOffset, length int64, writer io.Writer, etag string) (*s3.GetObjectOutput, error) { + _, err := os.Stat(bucket) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket) + } + if err != nil { + return nil, fmt.Errorf("stat bucket: %w", err) + } + + objPath := filepath.Join(bucket, object) + fi, err := os.Stat(objPath) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchKey) + } + if err != nil { + return nil, fmt.Errorf("stat object: %w", err) + } + + if startOffset+length > fi.Size() { + // TODO: is ErrInvalidRequest correct here? + return nil, s3err.GetAPIError(s3err.ErrInvalidRequest) + } + + f, err := os.Open(objPath) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchKey) + } + if err != nil { + return nil, fmt.Errorf("open object: %w", err) + } + defer f.Close() + + rdr := io.NewSectionReader(f, startOffset, length) + _, err = io.Copy(writer, rdr) + if err != nil { + return nil, fmt.Errorf("copy data: %w", err) + } + + userMetaData := make(map[string]string) + + _, contentType, contentEncoding := loadUserMetaData(objPath, userMetaData) + + // TODO: fill range request header? + // TODO: parse tags for tag count? + return &s3.GetObjectOutput{ + ContentLength: length, + ContentEncoding: &contentEncoding, + ContentType: &contentType, + ETag: &etag, + LastModified: backend.GetTimePtr(fi.ModTime()), + Metadata: userMetaData, + }, nil +} + +func (p *Posix) HeadObject(bucket, object string, etag string) (*s3.HeadObjectOutput, error) { + _, err := os.Stat(bucket) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket) + } + if err != nil { + return nil, fmt.Errorf("stat bucket: %w", err) + } + + objPath := filepath.Join(bucket, object) + fi, err := os.Stat(objPath) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchKey) + } + if err != nil { + return nil, fmt.Errorf("stat object: %w", err) + } + + userMetaData := make(map[string]string) + _, contentType, contentEncoding := loadUserMetaData(filepath.Join(bucket, objPath), userMetaData) + + // TODO: fill accept ranges request header? + // TODO: do we need to get etag from xattr? + return &s3.HeadObjectOutput{ + ContentLength: fi.Size(), + ContentType: &contentType, + ContentEncoding: &contentEncoding, + ETag: &etag, + LastModified: backend.GetTimePtr(fi.ModTime()), + Metadata: userMetaData, + }, nil +} + +func (p *Posix) CopyObject(srcBucket, srcObject, DstBucket, dstObject string) (*s3.CopyObjectOutput, error) { + _, err := os.Stat(srcBucket) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket) + } + if err != nil { + return nil, fmt.Errorf("stat bucket: %w", err) + } + + _, err = os.Stat(DstBucket) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket) + } + if err != nil { + return nil, fmt.Errorf("stat bucket: %w", err) + } + + objPath := filepath.Join(srcBucket, srcObject) + f, err := os.Open(objPath) + if err != nil && os.IsNotExist(err) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchKey) + } + if err != nil { + return nil, fmt.Errorf("stat object: %w", err) + } + defer f.Close() + + etag, err := p.PutObject(DstBucket, dstObject, f) + if err != nil { + return nil, err + } + + fi, err := os.Stat(filepath.Join(DstBucket, dstObject)) + if err != nil { + return nil, fmt.Errorf("stat dst object: %w", err) + } + + return &s3.CopyObjectOutput{ + CopyObjectResult: &types.CopyObjectResult{ + ETag: &etag, + LastModified: backend.GetTimePtr(fi.ModTime()), + }, + }, nil +} + +func (p *Posix) ListObjects(bucket, prefix, marker, delim string, maxkeys int) (*s3.ListBucketsOutput, error) { + return nil, s3err.GetAPIError(s3err.ErrNotImplemented) +} +func (p *Posix) ListObjectsV2(bucket, prefix, marker, delim string, maxkeys int) (*s3.ListBucketsOutput, error) { + return nil, s3err.GetAPIError(s3err.ErrNotImplemented) +}