From 576dfc5884e28769660c3ff60ea81ed4bdbc22a1 Mon Sep 17 00:00:00 2001 From: Ben McClelland Date: Mon, 10 Jun 2024 17:42:55 -0700 Subject: [PATCH] fix: correct metadata, tags, and lock info for scoutfs multipart objects Add meta.MetadataStorer compatibility to scoutfs so that scoutfs is using the same interface as posix. This fixes the metadata retrieval and adds the recently supported object lock compatibility as well. --- backend/posix/posix.go | 10 +- backend/scoutfs/scoutfs.go | 278 +++++++++++++++++++++++------- backend/scoutfs/scoutfs_compat.go | 19 +- 3 files changed, 227 insertions(+), 80 deletions(-) diff --git a/backend/posix/posix.go b/backend/posix/posix.go index 2d893e9..42c1dae 100644 --- a/backend/posix/posix.go +++ b/backend/posix/posix.go @@ -509,6 +509,10 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM partsize := int64(0) var totalsize int64 for i, part := range parts { + if part.PartNumber == nil || *part.PartNumber < 1 { + return nil, s3err.GetAPIError(s3err.ErrInvalidPart) + } + partObjPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *part.PartNumber)) fullPartPath := filepath.Join(bucket, partObjPath) fi, err := os.Lstat(fullPartPath) @@ -530,7 +534,7 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM if err != nil { etag = "" } - if etag != *parts[i].ETag { + if parts[i].ETag == nil || etag != *parts[i].ETag { return nil, s3err.GetAPIError(s3err.ErrInvalidPart) } } @@ -546,6 +550,10 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM defer f.cleanup() for _, part := range parts { + if part.PartNumber == nil || *part.PartNumber < 1 { + return nil, s3err.GetAPIError(s3err.ErrInvalidPart) + } + partObjPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *part.PartNumber)) fullPartPath := filepath.Join(bucket, partObjPath) pf, err := os.Open(fullPartPath) diff --git a/backend/scoutfs/scoutfs.go b/backend/scoutfs/scoutfs.go index b5c065e..b0a38eb 100644 --- a/backend/scoutfs/scoutfs.go +++ b/backend/scoutfs/scoutfs.go @@ -26,12 +26,14 @@ import ( "path/filepath" "strings" "syscall" + "time" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/pkg/xattr" "github.com/versity/versitygw/auth" "github.com/versity/versitygw/backend" + "github.com/versity/versitygw/backend/meta" "github.com/versity/versitygw/backend/posix" "github.com/versity/versitygw/s3err" ) @@ -47,6 +49,9 @@ type ScoutFS struct { rootfd *os.File rootdir string + // bucket/object metadata storage facility + meta meta.MetadataStorer + // glaciermode enables the following behavior: // GET object: if file offline, return invalid object state // HEAD object: if file offline, set obj storage class to GLACIER @@ -75,8 +80,13 @@ const ( metaTmpDir = ".sgwtmp" metaTmpMultipartDir = metaTmpDir + "/multipart" tagHdr = "X-Amz-Tagging" + metaHdr = "X-Amz-Meta" + contentTypeHdr = "content-type" + contentEncHdr = "content-encoding" emptyMD5 = "d41d8cd98f00b204e9800998ecf8427e" - etagkey = "user.etag" + etagkey = "etag" + objectRetentionKey = "object-retention" + objectLegalHoldKey = "object-legal-hold" ) var ( @@ -87,11 +97,12 @@ var ( const ( // ScoutFS special xattr types - systemPrefix = "scoutfs.hide." onameAttr = systemPrefix + "objname" flagskey = systemPrefix + "sam_flags" stagecopykey = systemPrefix + "sam_stagereq" + + fsBlocksize = 4096 ) const ( @@ -179,18 +190,20 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet return nil, err } - objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum)) + objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum)) // check all parts ok last := len(parts) - 1 partsize := int64(0) var totalsize int64 - for i, p := range parts { - if p.PartNumber == nil { + for i, part := range parts { + if part.PartNumber == nil || *part.PartNumber < 1 { return nil, s3err.GetAPIError(s3err.ErrInvalidPart) } - partPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *p.PartNumber)) - fi, err := os.Lstat(partPath) + + partObjPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *part.PartNumber)) + fullPartPath := filepath.Join(bucket, partObjPath) + fi, err := os.Lstat(fullPartPath) if err != nil { return nil, s3err.GetAPIError(s3err.ErrInvalidPart) } @@ -198,23 +211,25 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet if i == 0 { partsize = fi.Size() } + + // partsize must be a multiple of the filesystem blocksize + // except for last part + if i < last && partsize%fsBlocksize != 0 { + return nil, s3err.GetAPIError(s3err.ErrInvalidPart) + } + totalsize += fi.Size() // all parts except the last need to be the same size if i < last && partsize != fi.Size() { return nil, s3err.GetAPIError(s3err.ErrInvalidPart) } - // non-last part sizes need to be multiples of 4k for move blocks - // TODO: fallback to no move blocks if not 4k aligned? - if i == 0 && i < last && fi.Size()%4096 != 0 { - return nil, s3err.GetAPIError(s3err.ErrInvalidPart) - } - b, err := xattr.Get(partPath, "user.etag") + b, err := s.meta.RetrieveAttribute(bucket, partObjPath, etagkey) etag := string(b) if err != nil { etag = "" } - if etag != *parts[i].ETag { + if parts[i].ETag == nil || etag != *parts[i].ETag { return nil, s3err.GetAPIError(s3err.ErrInvalidPart) } } @@ -230,10 +245,16 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet } defer f.cleanup() - for _, p := range parts { - pf, err := os.Open(filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *p.PartNumber))) + for _, part := range parts { + if part.PartNumber == nil || *part.PartNumber < 1 { + return nil, s3err.GetAPIError(s3err.ErrInvalidPart) + } + + partObjPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *part.PartNumber)) + fullPartPath := filepath.Join(bucket, partObjPath) + pf, err := os.Open(fullPartPath) if err != nil { - return nil, fmt.Errorf("open part %v: %v", *p.PartNumber, err) + return nil, fmt.Errorf("open part %v: %v", *part.PartNumber, err) } // scoutfs move data is a metadata only operation that moves the data @@ -242,13 +263,13 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet err = moveData(pf, f.f) pf.Close() if err != nil { - return nil, fmt.Errorf("move blocks part %v: %v", *p.PartNumber, err) + return nil, fmt.Errorf("move blocks part %v: %v", *part.PartNumber, err) } } userMetaData := make(map[string]string) upiddir := filepath.Join(objdir, uploadID) - loadUserMetaData(upiddir, userMetaData) + cType, _ := s.loadUserMetaData(bucket, upiddir, userMetaData) objname := filepath.Join(bucket, object) dir := filepath.Dir(objname) @@ -265,7 +286,7 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet } for k, v := range userMetaData { - err = xattr.Set(objname, "user."+k, []byte(v)) + err = s.meta.StoreAttribute(bucket, object, fmt.Sprintf("%v.%v", metaHdr, k), []byte(v)) if err != nil { // cleanup object if returning error os.Remove(objname) @@ -273,10 +294,58 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet } } + // load and set tagging + tagging, err := s.meta.RetrieveAttribute(bucket, upiddir, tagHdr) + if err == nil { + if err := s.meta.StoreAttribute(bucket, object, tagHdr, tagging); err != nil { + // cleanup object + os.Remove(objname) + return nil, fmt.Errorf("set object tagging: %w", err) + } + } + if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { + return nil, fmt.Errorf("get object tagging: %w", err) + } + + // set content-type + if cType != "" { + if err := s.meta.StoreAttribute(bucket, object, contentTypeHdr, []byte(cType)); err != nil { + // cleanup object + os.Remove(objname) + return nil, fmt.Errorf("set object content type: %w", err) + } + } + + // load and set legal hold + lHold, err := s.meta.RetrieveAttribute(bucket, upiddir, objectLegalHoldKey) + if err == nil { + if err := s.meta.StoreAttribute(bucket, object, objectLegalHoldKey, lHold); err != nil { + // cleanup object + os.Remove(objname) + return nil, fmt.Errorf("set object legal hold: %w", err) + } + } + if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { + return nil, fmt.Errorf("get object legal hold: %w", err) + } + + // load and set retention + ret, err := s.meta.RetrieveAttribute(bucket, upiddir, objectRetentionKey) + if err == nil { + if err := s.meta.StoreAttribute(bucket, object, objectRetentionKey, ret); err != nil { + // cleanup object + os.Remove(objname) + return nil, fmt.Errorf("set object retention: %w", err) + } + } + if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { + return nil, fmt.Errorf("get object retention: %w", err) + } + // Calculate s3 compatible md5sum for complete multipart. s3MD5 := backend.GetMultipartMD5(parts) - err = xattr.Set(objname, "user.etag", []byte(s3MD5)) + err = s.meta.StoreAttribute(bucket, object, etagkey, []byte(s3MD5)) if err != nil { // cleanup object if returning error os.Remove(objname) @@ -310,61 +379,104 @@ func (s *ScoutFS) checkUploadIDExists(bucket, object, uploadID string) ([32]byte return sum, nil } -func loadUserMetaData(path string, m map[string]string) (contentType, contentEncoding string) { - ents, err := xattr.List(path) +// fll out the user metadata map with the metadata for the object +// and return the content type and encoding +func (s *ScoutFS) loadUserMetaData(bucket, object string, m map[string]string) (string, string) { + ents, err := s.meta.ListAttributes(bucket, object) if err != nil || len(ents) == 0 { - return + return "", "" } for _, e := range ents { if !isValidMeta(e) { continue } - b, err := xattr.Get(path, e) - if err == errNoData { - m[strings.TrimPrefix(e, "user.")] = "" - continue - } + b, err := s.meta.RetrieveAttribute(bucket, object, e) if err != nil { continue } - m[strings.TrimPrefix(e, "user.")] = string(b) + if b == nil { + m[strings.TrimPrefix(e, fmt.Sprintf("%v.", metaHdr))] = "" + continue + } + m[strings.TrimPrefix(e, fmt.Sprintf("%v.", metaHdr))] = string(b) } - b, err := xattr.Get(path, "user.content-type") + var contentType, contentEncoding string + b, _ := s.meta.RetrieveAttribute(bucket, object, contentTypeHdr) contentType = string(b) - if err != nil { - contentType = "" - } if contentType != "" { - m["content-type"] = contentType + m[contentTypeHdr] = contentType } - b, err = xattr.Get(path, "user.content-encoding") + b, _ = s.meta.RetrieveAttribute(bucket, object, contentEncHdr) contentEncoding = string(b) - if err != nil { - contentEncoding = "" - } if contentEncoding != "" { - m["content-encoding"] = contentEncoding + m[contentEncHdr] = contentEncoding } - return + return contentType, contentEncoding } func isValidMeta(val string) bool { - if strings.HasPrefix(val, "user.X-Amz-Meta") { + if strings.HasPrefix(val, metaHdr) { return true } - if strings.EqualFold(val, "user.Expires") { + if strings.EqualFold(val, "Expires") { return true } return false } -func (s *ScoutFS) HeadObject(_ context.Context, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) { +func (s *ScoutFS) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) { + if input.Bucket == nil { + return nil, s3err.GetAPIError(s3err.ErrInvalidBucketName) + } + if input.Key == nil { + return nil, s3err.GetAPIError(s3err.ErrNoSuchKey) + } bucket := *input.Bucket object := *input.Key + if input.PartNumber != nil { + uploadId, sum, err := s.retrieveUploadId(bucket, object) + if err != nil { + return nil, err + } + + ents, err := os.ReadDir(filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum), uploadId)) + if errors.Is(err, fs.ErrNotExist) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchKey) + } + if err != nil { + return nil, fmt.Errorf("read parts: %w", err) + } + + partPath := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum), uploadId, fmt.Sprintf("%v", *input.PartNumber)) + + part, err := os.Stat(filepath.Join(bucket, partPath)) + if errors.Is(err, fs.ErrNotExist) { + return nil, s3err.GetAPIError(s3err.ErrInvalidPart) + } + if err != nil { + return nil, fmt.Errorf("stat part: %w", err) + } + + b, err := s.meta.RetrieveAttribute(bucket, partPath, etagkey) + etag := string(b) + if err != nil { + etag = "" + } + partsCount := int32(len(ents)) + size := part.Size() + + return &s3.HeadObjectOutput{ + LastModified: backend.GetTimePtr(part.ModTime()), + ETag: &etag, + PartsCount: &partsCount, + ContentLength: &size, + }, nil + } + _, err := os.Stat(bucket) if errors.Is(err, fs.ErrNotExist) { return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket) @@ -383,9 +495,14 @@ func (s *ScoutFS) HeadObject(_ context.Context, input *s3.HeadObjectInput) (*s3. } userMetaData := make(map[string]string) - contentType, contentEncoding := loadUserMetaData(objPath, userMetaData) + contentType, contentEncoding := s.loadUserMetaData(bucket, object, userMetaData) - b, err := xattr.Get(objPath, etagkey) + if fi.IsDir() { + // this is the media type for directories in AWS and Nextcloud + contentType = "application/x-directory" + } + + b, err := s.meta.RetrieveAttribute(bucket, object, etagkey) etag := string(b) if err != nil { etag = "" @@ -424,18 +541,54 @@ func (s *ScoutFS) HeadObject(_ context.Context, input *s3.HeadObjectInput) (*s3. contentLength := fi.Size() + var objectLockLegalHoldStatus types.ObjectLockLegalHoldStatus + status, err := s.Posix.GetObjectLegalHold(ctx, bucket, object, "") + if err == nil { + if *status { + objectLockLegalHoldStatus = types.ObjectLockLegalHoldStatusOn + } else { + objectLockLegalHoldStatus = types.ObjectLockLegalHoldStatusOff + } + } + + var objectLockMode types.ObjectLockMode + var objectLockRetainUntilDate *time.Time + retention, err := s.Posix.GetObjectRetention(ctx, bucket, object, "") + if err == nil { + var config types.ObjectLockRetention + if err := json.Unmarshal(retention, &config); err == nil { + objectLockMode = types.ObjectLockMode(config.Mode) + objectLockRetainUntilDate = config.RetainUntilDate + } + } + return &s3.HeadObjectOutput{ - ContentLength: &contentLength, - ContentType: &contentType, - ContentEncoding: &contentEncoding, - ETag: &etag, - LastModified: backend.GetTimePtr(fi.ModTime()), - Metadata: userMetaData, - StorageClass: stclass, - Restore: &requestOngoing, + ContentLength: &contentLength, + ContentType: &contentType, + ContentEncoding: &contentEncoding, + ETag: &etag, + LastModified: backend.GetTimePtr(fi.ModTime()), + Metadata: userMetaData, + StorageClass: stclass, + Restore: &requestOngoing, + ObjectLockLegalHoldStatus: objectLockLegalHoldStatus, + ObjectLockMode: objectLockMode, + ObjectLockRetainUntilDate: objectLockRetainUntilDate, }, nil } +func (s *ScoutFS) retrieveUploadId(bucket, object string) (string, [32]byte, error) { + sum := sha256.Sum256([]byte(object)) + objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum)) + + entries, err := os.ReadDir(objdir) + if err != nil || len(entries) == 0 { + return "", [32]byte{}, s3err.GetAPIError(s3err.ErrNoSuchKey) + } + + return entries[0].Name(), sum, nil +} + func (s *ScoutFS) GetObject(_ context.Context, input *s3.GetObjectInput, writer io.Writer) (*s3.GetObjectOutput, error) { bucket := *input.Bucket object := *input.Key @@ -515,9 +668,9 @@ func (s *ScoutFS) GetObject(_ context.Context, input *s3.GetObjectInput, writer userMetaData := make(map[string]string) - contentType, contentEncoding := loadUserMetaData(objPath, userMetaData) + contentType, contentEncoding := s.loadUserMetaData(bucket, object, userMetaData) - b, err := xattr.Get(objPath, etagkey) + b, err := s.meta.RetrieveAttribute(bucket, object, etagkey) etag := string(b) if err != nil { etag = "" @@ -671,14 +824,11 @@ func (s *ScoutFS) fileToObj(bucket string) backend.GetObjFunc { if d.IsDir() { // directory object only happens if directory empty // check to see if this is a directory object by checking etag - etagBytes, err := xattr.Get(objPath, etagkey) - if isNoAttr(err) || errors.Is(err, fs.ErrNotExist) { - return types.Object{}, backend.ErrSkipObj - } + b, err := s.meta.RetrieveAttribute(bucket, path, etagkey) if err != nil { return types.Object{}, fmt.Errorf("get etag: %w", err) } - etag := string(etagBytes) + etag := string(b) fi, err := d.Info() if errors.Is(err, fs.ErrNotExist) { @@ -698,14 +848,14 @@ func (s *ScoutFS) fileToObj(bucket string) backend.GetObjFunc { } // file object, get object info and fill out object data - etagBytes, err := xattr.Get(objPath, etagkey) + b, err := s.meta.RetrieveAttribute(bucket, path, etagkey) if errors.Is(err, fs.ErrNotExist) { return types.Object{}, backend.ErrSkipObj } - if err != nil && !isNoAttr(err) { + if err != nil { return types.Object{}, fmt.Errorf("get etag: %w", err) } - etag := string(etagBytes) + etag := string(b) fi, err := d.Info() if errors.Is(err, fs.ErrNotExist) { diff --git a/backend/scoutfs/scoutfs_compat.go b/backend/scoutfs/scoutfs_compat.go index f6a640f..48fc9f1 100644 --- a/backend/scoutfs/scoutfs_compat.go +++ b/backend/scoutfs/scoutfs_compat.go @@ -23,7 +23,6 @@ import ( "os" "path/filepath" "strconv" - "syscall" "golang.org/x/sys/unix" @@ -35,7 +34,9 @@ import ( ) func New(rootdir string, opts ScoutfsOpts) (*ScoutFS, error) { - p, err := posix.New(rootdir, meta.XattrMeta{}, posix.PosixOpts{ + metastore := meta.XattrMeta{} + + p, err := posix.New(rootdir, metastore, posix.PosixOpts{ ChownUID: opts.ChownUID, ChownGID: opts.ChownGID, }) @@ -52,6 +53,7 @@ func New(rootdir string, opts ScoutfsOpts) (*ScoutFS, error) { Posix: p, rootfd: f, rootdir: rootdir, + meta: metastore, chownuid: opts.ChownUID, chowngid: opts.ChownGID, }, nil @@ -100,11 +102,6 @@ func (s *ScoutFS) openTmpFile(dir, bucket, obj string, size int64, acct auth.Acc gid: gid, } - // falloc is best effort, its fine if this fails - if size > 0 { - tmp.falloc() - } - if doChown { err := f.Chown(uid, gid) if err != nil { @@ -115,14 +112,6 @@ func (s *ScoutFS) openTmpFile(dir, bucket, obj string, size int64, acct auth.Acc return tmp, nil } -func (tmp *tmpfile) falloc() error { - err := syscall.Fallocate(int(tmp.f.Fd()), 0, 0, tmp.size) - if err != nil { - return fmt.Errorf("fallocate: %v", err) - } - return nil -} - func (tmp *tmpfile) link() error { // We use Linkat/Rename as the atomic operation for object puts. The // upload is written to a temp (or unnamed/O_TMPFILE) file to not conflict