diff --git a/backend/posix/posix.go b/backend/posix/posix.go index 2d893e9..42c1dae 100644 --- a/backend/posix/posix.go +++ b/backend/posix/posix.go @@ -509,6 +509,10 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM partsize := int64(0) var totalsize int64 for i, part := range parts { + if part.PartNumber == nil || *part.PartNumber < 1 { + return nil, s3err.GetAPIError(s3err.ErrInvalidPart) + } + partObjPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *part.PartNumber)) fullPartPath := filepath.Join(bucket, partObjPath) fi, err := os.Lstat(fullPartPath) @@ -530,7 +534,7 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM if err != nil { etag = "" } - if etag != *parts[i].ETag { + if parts[i].ETag == nil || etag != *parts[i].ETag { return nil, s3err.GetAPIError(s3err.ErrInvalidPart) } } @@ -546,6 +550,10 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM defer f.cleanup() for _, part := range parts { + if part.PartNumber == nil || *part.PartNumber < 1 { + return nil, s3err.GetAPIError(s3err.ErrInvalidPart) + } + partObjPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *part.PartNumber)) fullPartPath := filepath.Join(bucket, partObjPath) pf, err := os.Open(fullPartPath) diff --git a/backend/scoutfs/scoutfs.go b/backend/scoutfs/scoutfs.go index b5c065e..b0a38eb 100644 --- a/backend/scoutfs/scoutfs.go +++ b/backend/scoutfs/scoutfs.go @@ -26,12 +26,14 @@ import ( "path/filepath" "strings" "syscall" + "time" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/pkg/xattr" "github.com/versity/versitygw/auth" "github.com/versity/versitygw/backend" + "github.com/versity/versitygw/backend/meta" "github.com/versity/versitygw/backend/posix" "github.com/versity/versitygw/s3err" ) @@ -47,6 +49,9 @@ type ScoutFS struct { rootfd *os.File rootdir string + // bucket/object metadata storage facility + meta meta.MetadataStorer + // glaciermode enables the following behavior: // GET object: if file offline, return invalid object state // HEAD object: if file offline, set obj storage class to GLACIER @@ -75,8 +80,13 @@ const ( metaTmpDir = ".sgwtmp" metaTmpMultipartDir = metaTmpDir + "/multipart" tagHdr = "X-Amz-Tagging" + metaHdr = "X-Amz-Meta" + contentTypeHdr = "content-type" + contentEncHdr = "content-encoding" emptyMD5 = "d41d8cd98f00b204e9800998ecf8427e" - etagkey = "user.etag" + etagkey = "etag" + objectRetentionKey = "object-retention" + objectLegalHoldKey = "object-legal-hold" ) var ( @@ -87,11 +97,12 @@ var ( const ( // ScoutFS special xattr types - systemPrefix = "scoutfs.hide." onameAttr = systemPrefix + "objname" flagskey = systemPrefix + "sam_flags" stagecopykey = systemPrefix + "sam_stagereq" + + fsBlocksize = 4096 ) const ( @@ -179,18 +190,20 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet return nil, err } - objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum)) + objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum)) // check all parts ok last := len(parts) - 1 partsize := int64(0) var totalsize int64 - for i, p := range parts { - if p.PartNumber == nil { + for i, part := range parts { + if part.PartNumber == nil || *part.PartNumber < 1 { return nil, s3err.GetAPIError(s3err.ErrInvalidPart) } - partPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *p.PartNumber)) - fi, err := os.Lstat(partPath) + + partObjPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *part.PartNumber)) + fullPartPath := filepath.Join(bucket, partObjPath) + fi, err := os.Lstat(fullPartPath) if err != nil { return nil, s3err.GetAPIError(s3err.ErrInvalidPart) } @@ -198,23 +211,25 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet if i == 0 { partsize = fi.Size() } + + // partsize must be a multiple of the filesystem blocksize + // except for last part + if i < last && partsize%fsBlocksize != 0 { + return nil, s3err.GetAPIError(s3err.ErrInvalidPart) + } + totalsize += fi.Size() // all parts except the last need to be the same size if i < last && partsize != fi.Size() { return nil, s3err.GetAPIError(s3err.ErrInvalidPart) } - // non-last part sizes need to be multiples of 4k for move blocks - // TODO: fallback to no move blocks if not 4k aligned? - if i == 0 && i < last && fi.Size()%4096 != 0 { - return nil, s3err.GetAPIError(s3err.ErrInvalidPart) - } - b, err := xattr.Get(partPath, "user.etag") + b, err := s.meta.RetrieveAttribute(bucket, partObjPath, etagkey) etag := string(b) if err != nil { etag = "" } - if etag != *parts[i].ETag { + if parts[i].ETag == nil || etag != *parts[i].ETag { return nil, s3err.GetAPIError(s3err.ErrInvalidPart) } } @@ -230,10 +245,16 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet } defer f.cleanup() - for _, p := range parts { - pf, err := os.Open(filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *p.PartNumber))) + for _, part := range parts { + if part.PartNumber == nil || *part.PartNumber < 1 { + return nil, s3err.GetAPIError(s3err.ErrInvalidPart) + } + + partObjPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *part.PartNumber)) + fullPartPath := filepath.Join(bucket, partObjPath) + pf, err := os.Open(fullPartPath) if err != nil { - return nil, fmt.Errorf("open part %v: %v", *p.PartNumber, err) + return nil, fmt.Errorf("open part %v: %v", *part.PartNumber, err) } // scoutfs move data is a metadata only operation that moves the data @@ -242,13 +263,13 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet err = moveData(pf, f.f) pf.Close() if err != nil { - return nil, fmt.Errorf("move blocks part %v: %v", *p.PartNumber, err) + return nil, fmt.Errorf("move blocks part %v: %v", *part.PartNumber, err) } } userMetaData := make(map[string]string) upiddir := filepath.Join(objdir, uploadID) - loadUserMetaData(upiddir, userMetaData) + cType, _ := s.loadUserMetaData(bucket, upiddir, userMetaData) objname := filepath.Join(bucket, object) dir := filepath.Dir(objname) @@ -265,7 +286,7 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet } for k, v := range userMetaData { - err = xattr.Set(objname, "user."+k, []byte(v)) + err = s.meta.StoreAttribute(bucket, object, fmt.Sprintf("%v.%v", metaHdr, k), []byte(v)) if err != nil { // cleanup object if returning error os.Remove(objname) @@ -273,10 +294,58 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet } } + // load and set tagging + tagging, err := s.meta.RetrieveAttribute(bucket, upiddir, tagHdr) + if err == nil { + if err := s.meta.StoreAttribute(bucket, object, tagHdr, tagging); err != nil { + // cleanup object + os.Remove(objname) + return nil, fmt.Errorf("set object tagging: %w", err) + } + } + if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { + return nil, fmt.Errorf("get object tagging: %w", err) + } + + // set content-type + if cType != "" { + if err := s.meta.StoreAttribute(bucket, object, contentTypeHdr, []byte(cType)); err != nil { + // cleanup object + os.Remove(objname) + return nil, fmt.Errorf("set object content type: %w", err) + } + } + + // load and set legal hold + lHold, err := s.meta.RetrieveAttribute(bucket, upiddir, objectLegalHoldKey) + if err == nil { + if err := s.meta.StoreAttribute(bucket, object, objectLegalHoldKey, lHold); err != nil { + // cleanup object + os.Remove(objname) + return nil, fmt.Errorf("set object legal hold: %w", err) + } + } + if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { + return nil, fmt.Errorf("get object legal hold: %w", err) + } + + // load and set retention + ret, err := s.meta.RetrieveAttribute(bucket, upiddir, objectRetentionKey) + if err == nil { + if err := s.meta.StoreAttribute(bucket, object, objectRetentionKey, ret); err != nil { + // cleanup object + os.Remove(objname) + return nil, fmt.Errorf("set object retention: %w", err) + } + } + if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { + return nil, fmt.Errorf("get object retention: %w", err) + } + // Calculate s3 compatible md5sum for complete multipart. s3MD5 := backend.GetMultipartMD5(parts) - err = xattr.Set(objname, "user.etag", []byte(s3MD5)) + err = s.meta.StoreAttribute(bucket, object, etagkey, []byte(s3MD5)) if err != nil { // cleanup object if returning error os.Remove(objname) @@ -310,61 +379,104 @@ func (s *ScoutFS) checkUploadIDExists(bucket, object, uploadID string) ([32]byte return sum, nil } -func loadUserMetaData(path string, m map[string]string) (contentType, contentEncoding string) { - ents, err := xattr.List(path) +// fll out the user metadata map with the metadata for the object +// and return the content type and encoding +func (s *ScoutFS) loadUserMetaData(bucket, object string, m map[string]string) (string, string) { + ents, err := s.meta.ListAttributes(bucket, object) if err != nil || len(ents) == 0 { - return + return "", "" } for _, e := range ents { if !isValidMeta(e) { continue } - b, err := xattr.Get(path, e) - if err == errNoData { - m[strings.TrimPrefix(e, "user.")] = "" - continue - } + b, err := s.meta.RetrieveAttribute(bucket, object, e) if err != nil { continue } - m[strings.TrimPrefix(e, "user.")] = string(b) + if b == nil { + m[strings.TrimPrefix(e, fmt.Sprintf("%v.", metaHdr))] = "" + continue + } + m[strings.TrimPrefix(e, fmt.Sprintf("%v.", metaHdr))] = string(b) } - b, err := xattr.Get(path, "user.content-type") + var contentType, contentEncoding string + b, _ := s.meta.RetrieveAttribute(bucket, object, contentTypeHdr) contentType = string(b) - if err != nil { - contentType = "" - } if contentType != "" { - m["content-type"] = contentType + m[contentTypeHdr] = contentType } - b, err = xattr.Get(path, "user.content-encoding") + b, _ = s.meta.RetrieveAttribute(bucket, object, contentEncHdr) contentEncoding = string(b) - if err != nil { - contentEncoding = "" - } if contentEncoding != "" { - m["content-encoding"] = contentEncoding + m[contentEncHdr] = contentEncoding } - return + return contentType, contentEncoding } func isValidMeta(val string) bool { - if strings.HasPrefix(val, "user.X-Amz-Meta") { + if strings.HasPrefix(val, metaHdr) { return true } - if strings.EqualFold(val, "user.Expires") { + if strings.EqualFold(val, "Expires") { return true } return false } -func (s *ScoutFS) HeadObject(_ context.Context, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) { +func (s *ScoutFS) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) { + if input.Bucket == nil { + return nil, s3err.GetAPIError(s3err.ErrInvalidBucketName) + } + if input.Key == nil { + return nil, s3err.GetAPIError(s3err.ErrNoSuchKey) + } bucket := *input.Bucket object := *input.Key + if input.PartNumber != nil { + uploadId, sum, err := s.retrieveUploadId(bucket, object) + if err != nil { + return nil, err + } + + ents, err := os.ReadDir(filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum), uploadId)) + if errors.Is(err, fs.ErrNotExist) { + return nil, s3err.GetAPIError(s3err.ErrNoSuchKey) + } + if err != nil { + return nil, fmt.Errorf("read parts: %w", err) + } + + partPath := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum), uploadId, fmt.Sprintf("%v", *input.PartNumber)) + + part, err := os.Stat(filepath.Join(bucket, partPath)) + if errors.Is(err, fs.ErrNotExist) { + return nil, s3err.GetAPIError(s3err.ErrInvalidPart) + } + if err != nil { + return nil, fmt.Errorf("stat part: %w", err) + } + + b, err := s.meta.RetrieveAttribute(bucket, partPath, etagkey) + etag := string(b) + if err != nil { + etag = "" + } + partsCount := int32(len(ents)) + size := part.Size() + + return &s3.HeadObjectOutput{ + LastModified: backend.GetTimePtr(part.ModTime()), + ETag: &etag, + PartsCount: &partsCount, + ContentLength: &size, + }, nil + } + _, err := os.Stat(bucket) if errors.Is(err, fs.ErrNotExist) { return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket) @@ -383,9 +495,14 @@ func (s *ScoutFS) HeadObject(_ context.Context, input *s3.HeadObjectInput) (*s3. } userMetaData := make(map[string]string) - contentType, contentEncoding := loadUserMetaData(objPath, userMetaData) + contentType, contentEncoding := s.loadUserMetaData(bucket, object, userMetaData) - b, err := xattr.Get(objPath, etagkey) + if fi.IsDir() { + // this is the media type for directories in AWS and Nextcloud + contentType = "application/x-directory" + } + + b, err := s.meta.RetrieveAttribute(bucket, object, etagkey) etag := string(b) if err != nil { etag = "" @@ -424,18 +541,54 @@ func (s *ScoutFS) HeadObject(_ context.Context, input *s3.HeadObjectInput) (*s3. contentLength := fi.Size() + var objectLockLegalHoldStatus types.ObjectLockLegalHoldStatus + status, err := s.Posix.GetObjectLegalHold(ctx, bucket, object, "") + if err == nil { + if *status { + objectLockLegalHoldStatus = types.ObjectLockLegalHoldStatusOn + } else { + objectLockLegalHoldStatus = types.ObjectLockLegalHoldStatusOff + } + } + + var objectLockMode types.ObjectLockMode + var objectLockRetainUntilDate *time.Time + retention, err := s.Posix.GetObjectRetention(ctx, bucket, object, "") + if err == nil { + var config types.ObjectLockRetention + if err := json.Unmarshal(retention, &config); err == nil { + objectLockMode = types.ObjectLockMode(config.Mode) + objectLockRetainUntilDate = config.RetainUntilDate + } + } + return &s3.HeadObjectOutput{ - ContentLength: &contentLength, - ContentType: &contentType, - ContentEncoding: &contentEncoding, - ETag: &etag, - LastModified: backend.GetTimePtr(fi.ModTime()), - Metadata: userMetaData, - StorageClass: stclass, - Restore: &requestOngoing, + ContentLength: &contentLength, + ContentType: &contentType, + ContentEncoding: &contentEncoding, + ETag: &etag, + LastModified: backend.GetTimePtr(fi.ModTime()), + Metadata: userMetaData, + StorageClass: stclass, + Restore: &requestOngoing, + ObjectLockLegalHoldStatus: objectLockLegalHoldStatus, + ObjectLockMode: objectLockMode, + ObjectLockRetainUntilDate: objectLockRetainUntilDate, }, nil } +func (s *ScoutFS) retrieveUploadId(bucket, object string) (string, [32]byte, error) { + sum := sha256.Sum256([]byte(object)) + objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum)) + + entries, err := os.ReadDir(objdir) + if err != nil || len(entries) == 0 { + return "", [32]byte{}, s3err.GetAPIError(s3err.ErrNoSuchKey) + } + + return entries[0].Name(), sum, nil +} + func (s *ScoutFS) GetObject(_ context.Context, input *s3.GetObjectInput, writer io.Writer) (*s3.GetObjectOutput, error) { bucket := *input.Bucket object := *input.Key @@ -515,9 +668,9 @@ func (s *ScoutFS) GetObject(_ context.Context, input *s3.GetObjectInput, writer userMetaData := make(map[string]string) - contentType, contentEncoding := loadUserMetaData(objPath, userMetaData) + contentType, contentEncoding := s.loadUserMetaData(bucket, object, userMetaData) - b, err := xattr.Get(objPath, etagkey) + b, err := s.meta.RetrieveAttribute(bucket, object, etagkey) etag := string(b) if err != nil { etag = "" @@ -671,14 +824,11 @@ func (s *ScoutFS) fileToObj(bucket string) backend.GetObjFunc { if d.IsDir() { // directory object only happens if directory empty // check to see if this is a directory object by checking etag - etagBytes, err := xattr.Get(objPath, etagkey) - if isNoAttr(err) || errors.Is(err, fs.ErrNotExist) { - return types.Object{}, backend.ErrSkipObj - } + b, err := s.meta.RetrieveAttribute(bucket, path, etagkey) if err != nil { return types.Object{}, fmt.Errorf("get etag: %w", err) } - etag := string(etagBytes) + etag := string(b) fi, err := d.Info() if errors.Is(err, fs.ErrNotExist) { @@ -698,14 +848,14 @@ func (s *ScoutFS) fileToObj(bucket string) backend.GetObjFunc { } // file object, get object info and fill out object data - etagBytes, err := xattr.Get(objPath, etagkey) + b, err := s.meta.RetrieveAttribute(bucket, path, etagkey) if errors.Is(err, fs.ErrNotExist) { return types.Object{}, backend.ErrSkipObj } - if err != nil && !isNoAttr(err) { + if err != nil { return types.Object{}, fmt.Errorf("get etag: %w", err) } - etag := string(etagBytes) + etag := string(b) fi, err := d.Info() if errors.Is(err, fs.ErrNotExist) { diff --git a/backend/scoutfs/scoutfs_compat.go b/backend/scoutfs/scoutfs_compat.go index f6a640f..48fc9f1 100644 --- a/backend/scoutfs/scoutfs_compat.go +++ b/backend/scoutfs/scoutfs_compat.go @@ -23,7 +23,6 @@ import ( "os" "path/filepath" "strconv" - "syscall" "golang.org/x/sys/unix" @@ -35,7 +34,9 @@ import ( ) func New(rootdir string, opts ScoutfsOpts) (*ScoutFS, error) { - p, err := posix.New(rootdir, meta.XattrMeta{}, posix.PosixOpts{ + metastore := meta.XattrMeta{} + + p, err := posix.New(rootdir, metastore, posix.PosixOpts{ ChownUID: opts.ChownUID, ChownGID: opts.ChownGID, }) @@ -52,6 +53,7 @@ func New(rootdir string, opts ScoutfsOpts) (*ScoutFS, error) { Posix: p, rootfd: f, rootdir: rootdir, + meta: metastore, chownuid: opts.ChownUID, chowngid: opts.ChownGID, }, nil @@ -100,11 +102,6 @@ func (s *ScoutFS) openTmpFile(dir, bucket, obj string, size int64, acct auth.Acc gid: gid, } - // falloc is best effort, its fine if this fails - if size > 0 { - tmp.falloc() - } - if doChown { err := f.Chown(uid, gid) if err != nil { @@ -115,14 +112,6 @@ func (s *ScoutFS) openTmpFile(dir, bucket, obj string, size int64, acct auth.Acc return tmp, nil } -func (tmp *tmpfile) falloc() error { - err := syscall.Fallocate(int(tmp.f.Fd()), 0, 0, tmp.size) - if err != nil { - return fmt.Errorf("fallocate: %v", err) - } - return nil -} - func (tmp *tmpfile) link() error { // We use Linkat/Rename as the atomic operation for object puts. The // upload is written to a temp (or unnamed/O_TMPFILE) file to not conflict