posix: initial object requests

This commit is contained in:
Ben McClelland
2023-05-19 19:19:42 -07:00
parent 0b09f9d92d
commit 3559592fcd

View File

@@ -32,6 +32,7 @@ const (
metaTmpMultipartDir = metaTmpDir + "/multipart"
onameAttr = "user.objname"
tagHdr = "X-Amz-Tagging"
dirObjKey = "user.dirisobject"
)
var (
@@ -227,36 +228,9 @@ func (p *Posix) CompleteMultipartUpload(bucket, object, uploadID string, parts [
}
}
userDefined := make(map[string]string)
userMetaData := make(map[string]string)
upiddir := filepath.Join(objdir, uploadID)
fillUserDefined(upiddir, userDefined)
b, err := xattr.Get(upiddir, "user."+tagHdr)
tags := string(b)
if err != nil {
tags = ""
}
if tags != "" {
userDefined[tagHdr] = tags
}
b, err = xattr.Get(upiddir, "user.content-type")
contentType := string(b)
if err != nil {
contentType = ""
}
if contentType != "" {
userDefined["content-type"] = contentType
}
b, err = xattr.Get(upiddir, "user.content-encoding")
contentEncoding := string(b)
if err != nil {
contentEncoding = ""
}
if contentEncoding != "" {
userDefined["content-encoding"] = contentEncoding
}
loadUserMetaData(upiddir, userMetaData)
objname := filepath.Join(bucket, object)
dir := filepath.Dir(objname)
@@ -275,7 +249,7 @@ func (p *Posix) CompleteMultipartUpload(bucket, object, uploadID string, parts [
return nil, fmt.Errorf("link object in namespace: %w", err)
}
for k, v := range userDefined {
for k, v := range userMetaData {
err = xattr.Set(objname, "user."+k, []byte(v))
if err != nil {
// cleanup object if returning error
@@ -330,7 +304,7 @@ func (p *Posix) checkUploadIDExists(bucket, object, uploadID string) ([32]byte,
return sum, nil
}
func fillUserDefined(path string, m map[string]string) {
func loadUserMetaData(path string, m map[string]string) (tag, contentType, contentEncoding string) {
ents, err := xattr.List(path)
if err != nil || len(ents) == 0 {
return
@@ -349,21 +323,44 @@ func fillUserDefined(path string, m map[string]string) {
}
m[strings.TrimPrefix(e, "user.")] = string(b)
}
b, err := xattr.Get(path, "user."+tagHdr)
tag = string(b)
if err != nil {
tag = ""
}
if tag != "" {
m[tagHdr] = tag
}
b, err = xattr.Get(path, "user.content-type")
contentType = string(b)
if err != nil {
contentType = ""
}
if contentType != "" {
m["content-type"] = contentType
}
b, err = xattr.Get(path, "user.content-encoding")
contentEncoding = string(b)
if err != nil {
contentEncoding = ""
}
if contentEncoding != "" {
m["content-encoding"] = contentEncoding
}
return
}
func isValidMeta(val string) bool {
if strings.HasPrefix(val, "user.X-Amz-Meta") {
return true
}
if strings.HasPrefix(val, "user.x-amz-meta") {
return true
}
if strings.EqualFold(val, "user.Expires") {
return true
}
if strings.EqualFold(val, "user.expires") {
return true
}
return false
}
@@ -525,9 +522,9 @@ func (p *Posix) ListMultipartUploads(mpu *s3.ListMultipartUploadsInput) (*s3.Lis
continue
}
userDefined := make(map[string]string)
userMetaData := make(map[string]string)
upiddir := filepath.Join(bucket, metaTmpMultipartDir, obj.Name(), upid.Name())
fillUserDefined(upiddir, userDefined)
loadUserMetaData(upiddir, userMetaData)
uploadID := upid.Name()
uploads = append(uploads, types.MultipartUpload{
@@ -627,9 +624,9 @@ func (p *Posix) ListObjectParts(bucket, object, uploadID string, partNumberMarke
nextpart = parts[len(parts)-1].PartNumber
}
userDefined := make(map[string]string)
userMetaData := make(map[string]string)
upiddir := filepath.Join(objdir, uploadID)
fillUserDefined(upiddir, userDefined)
loadUserMetaData(upiddir, userMetaData)
return &s3.ListPartsOutput{
Bucket: &bucket,
@@ -684,3 +681,282 @@ func (p *Posix) PutObjectPart(bucket, object, uploadID string, part int, r io.Re
return etag, nil
}
func (p *Posix) PutObject(bucket, object string, r io.Reader) (string, error) {
_, err := os.Stat(bucket)
if err != nil && os.IsNotExist(err) {
return "", s3err.GetAPIError(s3err.ErrNoSuchBucket)
}
if err != nil {
return "", fmt.Errorf("stat bucket: %w", err)
}
name := filepath.Join(bucket, object)
etag := ""
if strings.HasSuffix(object, "/") {
// object is directory
err = mkdirAll(name, os.FileMode(0755), bucket, object)
if err != nil {
return "", err
}
// TODO: set user attrs
//for k, v := range metadata {
// xattr.Set(name, "user."+k, []byte(v))
//}
// set our tag that this dir was specifically put
xattr.Set(name, dirObjKey, nil)
} else {
// object is file
f, err := openTmpFile(".")
if err != nil {
return "", fmt.Errorf("open temp file: %w", err)
}
defer f.Close()
// TODO: fallocate based on content length
hash := md5.New()
rdr := io.TeeReader(r, hash)
_, err = io.Copy(f, rdr)
if err != nil {
f.Close()
return "", fmt.Errorf("write object data: %w", err)
}
dir := filepath.Dir(name)
if dir != "" {
err = mkdirAll(dir, os.FileMode(0755), bucket, object)
if err != nil {
f.Close()
return "", fmt.Errorf("make object parent directories: %w", err)
}
}
err = linkTmpFile(f, name)
if err != nil {
return "", fmt.Errorf("link object in namespace: %w", err)
}
// TODO: set user attrs
//for k, v := range metadata {
// xattr.Set(name, "user."+k, []byte(v))
//}
dataSum := hash.Sum(nil)
etag := hex.EncodeToString(dataSum[:])
xattr.Set(name, "user.etag", []byte(etag))
if newObjUID != 0 || newObjGID != 0 {
err = os.Chown(name, newObjUID, newObjGID)
if err != nil {
return "", fmt.Errorf("set object uid/gid: %v", err)
}
}
}
return etag, nil
}
func (p *Posix) DeleteObject(bucket, object string) error {
_, err := os.Stat(bucket)
if err != nil && os.IsNotExist(err) {
return s3err.GetAPIError(s3err.ErrNoSuchBucket)
}
if err != nil {
return fmt.Errorf("stat bucket: %w", err)
}
os.Remove(filepath.Join(bucket, object))
if err != nil && os.IsNotExist(err) {
return s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil {
return fmt.Errorf("delete object: %w", err)
}
return p.removeParents(bucket, object)
}
func (p *Posix) removeParents(bucket, object string) error {
// this will remove all parent directories that were not
// specifically uploaded with a put object. we detect
// this with a special xattr to indicate these. stop
// at either the bucket or the first parent we encounter
// with the xattr, whichever comes first.
objPath := filepath.Join(bucket, object)
for {
parent := filepath.Dir(objPath)
if filepath.Base(parent) == bucket {
break
}
_, err := xattr.Get(parent, dirObjKey)
if err == nil {
break
}
err = os.Remove(parent)
if err != nil {
break
}
objPath = parent
}
return nil
}
func (p *Posix) DeleteObjects(bucket string, objects *s3.DeleteObjectsInput) error {
// delete object already checks bucket
for _, obj := range objects.Delete.Objects {
err := p.DeleteObject(bucket, *obj.Key)
if err != nil {
return err
}
}
return nil
}
func (p *Posix) GetObject(bucket, object string, startOffset, length int64, writer io.Writer, etag string) (*s3.GetObjectOutput, error) {
_, err := os.Stat(bucket)
if err != nil && os.IsNotExist(err) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
}
objPath := filepath.Join(bucket, object)
fi, err := os.Stat(objPath)
if err != nil && os.IsNotExist(err) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil {
return nil, fmt.Errorf("stat object: %w", err)
}
if startOffset+length > fi.Size() {
// TODO: is ErrInvalidRequest correct here?
return nil, s3err.GetAPIError(s3err.ErrInvalidRequest)
}
f, err := os.Open(objPath)
if err != nil && os.IsNotExist(err) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil {
return nil, fmt.Errorf("open object: %w", err)
}
defer f.Close()
rdr := io.NewSectionReader(f, startOffset, length)
_, err = io.Copy(writer, rdr)
if err != nil {
return nil, fmt.Errorf("copy data: %w", err)
}
userMetaData := make(map[string]string)
_, contentType, contentEncoding := loadUserMetaData(objPath, userMetaData)
// TODO: fill range request header?
// TODO: parse tags for tag count?
return &s3.GetObjectOutput{
ContentLength: length,
ContentEncoding: &contentEncoding,
ContentType: &contentType,
ETag: &etag,
LastModified: backend.GetTimePtr(fi.ModTime()),
Metadata: userMetaData,
}, nil
}
func (p *Posix) HeadObject(bucket, object string, etag string) (*s3.HeadObjectOutput, error) {
_, err := os.Stat(bucket)
if err != nil && os.IsNotExist(err) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
}
objPath := filepath.Join(bucket, object)
fi, err := os.Stat(objPath)
if err != nil && os.IsNotExist(err) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil {
return nil, fmt.Errorf("stat object: %w", err)
}
userMetaData := make(map[string]string)
_, contentType, contentEncoding := loadUserMetaData(filepath.Join(bucket, objPath), userMetaData)
// TODO: fill accept ranges request header?
// TODO: do we need to get etag from xattr?
return &s3.HeadObjectOutput{
ContentLength: fi.Size(),
ContentType: &contentType,
ContentEncoding: &contentEncoding,
ETag: &etag,
LastModified: backend.GetTimePtr(fi.ModTime()),
Metadata: userMetaData,
}, nil
}
func (p *Posix) CopyObject(srcBucket, srcObject, DstBucket, dstObject string) (*s3.CopyObjectOutput, error) {
_, err := os.Stat(srcBucket)
if err != nil && os.IsNotExist(err) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
}
_, err = os.Stat(DstBucket)
if err != nil && os.IsNotExist(err) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
}
objPath := filepath.Join(srcBucket, srcObject)
f, err := os.Open(objPath)
if err != nil && os.IsNotExist(err) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil {
return nil, fmt.Errorf("stat object: %w", err)
}
defer f.Close()
etag, err := p.PutObject(DstBucket, dstObject, f)
if err != nil {
return nil, err
}
fi, err := os.Stat(filepath.Join(DstBucket, dstObject))
if err != nil {
return nil, fmt.Errorf("stat dst object: %w", err)
}
return &s3.CopyObjectOutput{
CopyObjectResult: &types.CopyObjectResult{
ETag: &etag,
LastModified: backend.GetTimePtr(fi.ModTime()),
},
}, nil
}
func (p *Posix) ListObjects(bucket, prefix, marker, delim string, maxkeys int) (*s3.ListBucketsOutput, error) {
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (p *Posix) ListObjectsV2(bucket, prefix, marker, delim string, maxkeys int) (*s3.ListBucketsOutput, error) {
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
}