diff --git a/backend/posix/posix.go b/backend/posix/posix.go index c20c228..11f154a 100644 --- a/backend/posix/posix.go +++ b/backend/posix/posix.go @@ -39,6 +39,7 @@ import ( "github.com/versity/versitygw/auth" "github.com/versity/versitygw/backend" "github.com/versity/versitygw/backend/meta" + "github.com/versity/versitygw/s3api/debuglogger" "github.com/versity/versitygw/s3api/utils" "github.com/versity/versitygw/s3err" "github.com/versity/versitygw/s3response" @@ -82,8 +83,8 @@ type Posix struct { var _ backend.Backend = &Posix{} const ( - metaTmpDir = ".sgwtmp" - metaTmpMultipartDir = metaTmpDir + "/multipart" + MetaTmpDir = ".sgwtmp" + MetaTmpMultipartDir = MetaTmpDir + "/multipart" onameAttr = "objname" tagHdr = "X-Amz-Tagging" metaHdr = "X-Amz-Meta" @@ -432,7 +433,7 @@ func (p *Posix) isBucketEmpty(bucket string) error { return fmt.Errorf("readdir bucket: %w", err) } if err == nil { - if len(ents) == 1 && ents[0].Name() != metaTmpDir { + if len(ents) == 1 && ents[0].Name() != MetaTmpDir { return s3err.GetAPIError(s3err.ErrVersionedBucketNotEmpty) } else if len(ents) > 1 { return s3err.GetAPIError(s3err.ErrVersionedBucketNotEmpty) @@ -447,7 +448,7 @@ func (p *Posix) isBucketEmpty(bucket string) error { if errors.Is(err, fs.ErrNotExist) { return s3err.GetAPIError(s3err.ErrNoSuchBucket) } - if len(ents) == 1 && ents[0].Name() != metaTmpDir { + if len(ents) == 1 && ents[0].Name() != MetaTmpDir { return s3err.GetAPIError(s3err.ErrBucketNotEmpty) } else if len(ents) > 1 { return s3err.GetAPIError(s3err.ErrBucketNotEmpty) @@ -693,7 +694,7 @@ func (p *Posix) createObjVersion(bucket, key string, size int64, acc auth.Accoun versionBucketPath := filepath.Join(p.versioningDir, bucket) versioningKey := filepath.Join(genObjVersionKey(key), versionId) - versionTmpPath := filepath.Join(versionBucketPath, metaTmpDir) + versionTmpPath := filepath.Join(versionBucketPath, MetaTmpDir) f, err := p.openTmpFile(versionTmpPath, versionBucketPath, versioningKey, size, acc, doFalloc, p.forceNoTmpFile) if err != nil { @@ -764,7 +765,7 @@ func (p *Posix) ListObjectVersions(ctx context.Context, input *s3.ListObjectVers fileSystem := os.DirFS(bucket) results, err := backend.WalkVersions(ctx, fileSystem, prefix, delim, keyMarker, versionIdMarker, max, - p.fileToObjVersions(bucket), []string{metaTmpDir}) + p.fileToObjVersions(bucket), []string{MetaTmpDir}) if err != nil { return s3response.ListVersionsResult{}, fmt.Errorf("walk %v: %w", bucket, err) } @@ -1210,7 +1211,7 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu s3response.Create objNameSum := sha256.Sum256([]byte(*mpu.Key)) // multiple uploads for same object name allowed, // they will all go into the same hashed name directory - objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", objNameSum)) + objdir := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", objNameSum)) tmppath := filepath.Join(bucket, objdir) // the unique upload id is a directory for all of the parts // associated with this specific multipart upload @@ -1360,6 +1361,10 @@ func getPartChecksum(algo types.ChecksumAlgorithm, part types.CompletedPart) str } func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) { + return p.CompleteMultipartUploadWithCopy(ctx, input, nil) +} + +func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.CompleteMultipartUploadInput, customMove func(from *os.File, to *os.File) error) (s3response.CompleteMultipartUploadResult, string, error) { acct, ok := ctx.Value("account").(auth.Account) if !ok { acct = auth.Account{} @@ -1395,7 +1400,7 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM return res, "", err } - objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum)) + objdir := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", sum)) checksums, err := p.retrieveChecksums(nil, bucket, filepath.Join(objdir, uploadID)) if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { @@ -1497,7 +1502,7 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM } } - f, err := p.openTmpFile(filepath.Join(bucket, metaTmpDir), bucket, object, + f, err := p.openTmpFile(filepath.Join(bucket, MetaTmpDir), bucket, object, totalsize, acct, skipFalloc, p.forceNoTmpFile) if err != nil { if errors.Is(err, syscall.EDQUOT) { @@ -1550,7 +1555,16 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM } } - _, err = io.Copy(f.File(), rdr) + if customMove != nil { + err = customMove(pf, f.File()) + if err != nil { + // Fail back to standard copy + debuglogger.Logf("Custom data block move failed (%w), failing back to io.Copy()", err) + _, err = io.Copy(f.File(), rdr) + } + } else { + _, err = io.Copy(f.File(), rdr) + } pf.Close() if err != nil { if errors.Is(err, syscall.EDQUOT) { @@ -1824,7 +1838,7 @@ func numberOfChecksums(part types.CompletedPart) int { func (p *Posix) checkUploadIDExists(bucket, object, uploadID string) ([32]byte, error) { sum := sha256.Sum256([]byte(object)) - objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum)) + objdir := filepath.Join(bucket, MetaTmpMultipartDir, fmt.Sprintf("%x", sum)) _, err := os.Stat(filepath.Join(objdir, uploadID)) if errors.Is(err, fs.ErrNotExist) { @@ -1838,7 +1852,7 @@ func (p *Posix) checkUploadIDExists(bucket, object, uploadID string) ([32]byte, func (p *Posix) retrieveUploadId(bucket, object string) (string, [32]byte, error) { sum := sha256.Sum256([]byte(object)) - objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum)) + objdir := filepath.Join(bucket, MetaTmpMultipartDir, fmt.Sprintf("%x", sum)) entries, err := os.ReadDir(objdir) if err != nil || len(entries) == 0 { @@ -1990,7 +2004,7 @@ func (p *Posix) AbortMultipartUpload(_ context.Context, mpu *s3.AbortMultipartUp } sum := sha256.Sum256([]byte(object)) - objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum)) + objdir := filepath.Join(bucket, MetaTmpMultipartDir, fmt.Sprintf("%x", sum)) _, err = os.Stat(filepath.Join(objdir, uploadID)) if err != nil { @@ -2028,7 +2042,7 @@ func (p *Posix) ListMultipartUploads(_ context.Context, mpu *s3.ListMultipartUpl } // ignore readdir error and use the empty list returned - objs, _ := os.ReadDir(filepath.Join(bucket, metaTmpMultipartDir)) + objs, _ := os.ReadDir(filepath.Join(bucket, MetaTmpMultipartDir)) var uploads []s3response.Upload var resultUpds []s3response.Upload @@ -2048,7 +2062,7 @@ func (p *Posix) ListMultipartUploads(_ context.Context, mpu *s3.ListMultipartUpl continue } - b, err := p.meta.RetrieveAttribute(nil, bucket, filepath.Join(metaTmpMultipartDir, obj.Name()), onameAttr) + b, err := p.meta.RetrieveAttribute(nil, bucket, filepath.Join(MetaTmpMultipartDir, obj.Name()), onameAttr) if err != nil { continue } @@ -2057,7 +2071,7 @@ func (p *Posix) ListMultipartUploads(_ context.Context, mpu *s3.ListMultipartUpl continue } - upids, err := os.ReadDir(filepath.Join(bucket, metaTmpMultipartDir, obj.Name())) + upids, err := os.ReadDir(filepath.Join(bucket, MetaTmpMultipartDir, obj.Name())) if err != nil { continue } @@ -2084,7 +2098,7 @@ func (p *Posix) ListMultipartUploads(_ context.Context, mpu *s3.ListMultipartUpl keyMarkerInd = len(uploads) } - checksum, err := p.retrieveChecksums(nil, bucket, filepath.Join(metaTmpMultipartDir, obj.Name(), uploadID)) + checksum, err := p.retrieveChecksums(nil, bucket, filepath.Join(MetaTmpMultipartDir, obj.Name(), uploadID)) if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { return lmu, fmt.Errorf("get mp checksum: %w", err) } @@ -2200,7 +2214,7 @@ func (p *Posix) ListParts(ctx context.Context, input *s3.ListPartsInput) (s3resp return lpr, err } - objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum)) + objdir := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", sum)) tmpdir := filepath.Join(bucket, objdir) ents, err := os.ReadDir(filepath.Join(tmpdir, uploadID)) @@ -2337,7 +2351,7 @@ func (p *Posix) UploadPart(ctx context.Context, input *s3.UploadPartInput) (*s3. } sum := sha256.Sum256([]byte(object)) - objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum)) + objdir := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", sum)) mpPath := filepath.Join(objdir, uploadID) _, err = os.Stat(filepath.Join(bucket, mpPath)) @@ -2511,7 +2525,7 @@ func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput) } sum := sha256.Sum256([]byte(*upi.Key)) - objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum)) + objdir := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", sum)) _, err = os.Stat(filepath.Join(*upi.Bucket, objdir, *upi.UploadId)) if errors.Is(err, fs.ErrNotExist) { @@ -2821,7 +2835,7 @@ func (p *Posix) PutObject(ctx context.Context, po s3response.PutObjectInput) (s3 return s3response.PutObjectOutput{}, fmt.Errorf("stat object: %w", err) } - f, err := p.openTmpFile(filepath.Join(*po.Bucket, metaTmpDir), + f, err := p.openTmpFile(filepath.Join(*po.Bucket, MetaTmpDir), *po.Bucket, *po.Key, contentLength, acct, doFalloc, p.forceNoTmpFile) if err != nil { if errors.Is(err, syscall.EDQUOT) { @@ -3175,7 +3189,7 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) ( acct = auth.Account{} } - f, err := p.openTmpFile(filepath.Join(bucket, metaTmpDir), + f, err := p.openTmpFile(filepath.Join(bucket, MetaTmpDir), bucket, object, srcObjVersion.Size(), acct, doFalloc, p.forceNoTmpFile) if err != nil { @@ -3641,7 +3655,7 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3. return nil, err } - ents, err := os.ReadDir(filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum), uploadId)) + ents, err := os.ReadDir(filepath.Join(bucket, MetaTmpMultipartDir, fmt.Sprintf("%x", sum), uploadId)) if errors.Is(err, fs.ErrNotExist) { return nil, s3err.GetAPIError(s3err.ErrNoSuchKey) } @@ -3649,7 +3663,7 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3. return nil, fmt.Errorf("read parts: %w", err) } - partPath := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum), uploadId, fmt.Sprintf("%v", *input.PartNumber)) + partPath := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", sum), uploadId, fmt.Sprintf("%v", *input.PartNumber)) part, err := os.Stat(filepath.Join(bucket, partPath)) if errors.Is(err, fs.ErrNotExist) { @@ -4201,6 +4215,10 @@ func (p *Posix) CopyObject(ctx context.Context, input s3response.CopyObjectInput } func (p *Posix) ListObjects(ctx context.Context, input *s3.ListObjectsInput) (s3response.ListObjectsResult, error) { + return p.ListObjectsParametrized(ctx, input, p.FileToObj) +} + +func (p *Posix) ListObjectsParametrized(ctx context.Context, input *s3.ListObjectsInput, customFileToObj func(string, bool) backend.GetObjFunc) (s3response.ListObjectsResult, error) { bucket := *input.Bucket prefix := "" if input.Prefix != nil { @@ -4229,7 +4247,7 @@ func (p *Posix) ListObjects(ctx context.Context, input *s3.ListObjectsInput) (s3 fileSystem := os.DirFS(bucket) results, err := backend.Walk(ctx, fileSystem, prefix, delim, marker, maxkeys, - p.fileToObj(bucket, true), []string{metaTmpDir}) + customFileToObj(bucket, true), []string{MetaTmpDir}) if err != nil { return s3response.ListObjectsResult{}, fmt.Errorf("walk %v: %w", bucket, err) } @@ -4247,7 +4265,7 @@ func (p *Posix) ListObjects(ctx context.Context, input *s3.ListObjectsInput) (s3 }, nil } -func (p *Posix) fileToObj(bucket string, fetchOwner bool) backend.GetObjFunc { +func (p *Posix) FileToObj(bucket string, fetchOwner bool) backend.GetObjFunc { return func(path string, d fs.DirEntry) (s3response.Object, error) { var owner *types.Owner // Retreive the object owner data from bucket ACL, if fetchOwner is true @@ -4350,6 +4368,10 @@ func (p *Posix) fileToObj(bucket string, fetchOwner bool) backend.GetObjFunc { } func (p *Posix) ListObjectsV2(ctx context.Context, input *s3.ListObjectsV2Input) (s3response.ListObjectsV2Result, error) { + return p.ListObjectsV2Parametrized(ctx, input, p.FileToObj) +} + +func (p *Posix) ListObjectsV2Parametrized(ctx context.Context, input *s3.ListObjectsV2Input, customFileToObj func(string, bool) backend.GetObjFunc) (s3response.ListObjectsV2Result, error) { bucket := *input.Bucket prefix := "" if input.Prefix != nil { @@ -4386,7 +4408,7 @@ func (p *Posix) ListObjectsV2(ctx context.Context, input *s3.ListObjectsV2Input) fileSystem := os.DirFS(bucket) results, err := backend.Walk(ctx, fileSystem, prefix, delim, marker, maxkeys, - p.fileToObj(bucket, fetchOwner), []string{metaTmpDir}) + customFileToObj(bucket, fetchOwner), []string{MetaTmpDir}) if err != nil { return s3response.ListObjectsV2Result{}, fmt.Errorf("walk %v: %w", bucket, err) } diff --git a/backend/posix/with_otmpfile.go b/backend/posix/with_otmpfile.go index 77f4f86..33d8b43 100644 --- a/backend/posix/with_otmpfile.go +++ b/backend/posix/with_otmpfile.go @@ -26,6 +26,7 @@ import ( "path/filepath" "strconv" "syscall" + "time" "github.com/versity/versitygw/auth" "github.com/versity/versitygw/backend" @@ -165,14 +166,10 @@ func (tmp *tmpfile) link() error { // of last upload completed wins and is not some combination of writes // from simultaneous uploads. objPath := filepath.Join(tmp.bucket, tmp.objname) - err := os.Remove(objPath) - if err != nil && !errors.Is(err, fs.ErrNotExist) { - return fmt.Errorf("remove stale path: %w", err) - } dir := filepath.Dir(objPath) - err = backend.MkdirAll(dir, tmp.uid, tmp.gid, tmp.needsChown, tmp.newDirPerm) + err := backend.MkdirAll(dir, tmp.uid, tmp.gid, tmp.needsChown, tmp.newDirPerm) if err != nil { return fmt.Errorf("make parent dir: %w", err) } @@ -194,21 +191,33 @@ func (tmp *tmpfile) link() error { } defer dirf.Close() - for { - err = unix.Linkat(int(procdir.Fd()), filepath.Base(tmp.f.Name()), - int(dirf.Fd()), filepath.Base(objPath), unix.AT_SYMLINK_FOLLOW) - if errors.Is(err, syscall.EEXIST) { - err := os.Remove(objPath) - if err != nil && !errors.Is(err, fs.ErrNotExist) { - return fmt.Errorf("remove stale path: %w", err) + err = unix.Linkat(int(procdir.Fd()), filepath.Base(tmp.f.Name()), + int(dirf.Fd()), filepath.Base(objPath), unix.AT_SYMLINK_FOLLOW) + if errors.Is(err, syscall.EEXIST) { + // Linkat cannot overwrite files; we will allocate a temporary file, Linkat to it and then Renameat it + // to avoid potential race condition + retries := 1 + for { + tmpName := fmt.Sprintf(".%s.sgwtmp.%d", filepath.Base(objPath), time.Now().UnixNano()) + err := unix.Linkat(int(procdir.Fd()), filepath.Base(tmp.f.Name()), + int(dirf.Fd()), tmpName, unix.AT_SYMLINK_FOLLOW) + if errors.Is(err, syscall.EEXIST) && retries < 3 { + retries += 1 + continue } - continue + if err != nil { + return fmt.Errorf("cannot find free temporary file: %w", err) + } + + err = unix.Renameat(int(dirf.Fd()), tmpName, int(dirf.Fd()), filepath.Base(objPath)) + if err != nil { + return fmt.Errorf("overwriting renameat failed: %w", err) + } + break } - if err != nil { - return fmt.Errorf("link tmpfile (fd %q as %q): %w", - filepath.Base(tmp.f.Name()), objPath, err) - } - break + } else if err != nil { + return fmt.Errorf("link tmpfile (fd %q as %q): %w", + filepath.Base(tmp.f.Name()), objPath, err) } err = tmp.f.Close() diff --git a/backend/scoutfs/scoutfs.go b/backend/scoutfs/scoutfs.go index ddffd06..aac820b 100644 --- a/backend/scoutfs/scoutfs.go +++ b/backend/scoutfs/scoutfs.go @@ -21,7 +21,6 @@ import ( "errors" "fmt" "io/fs" - "net/http" "os" "path/filepath" "strings" @@ -30,11 +29,8 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/pkg/xattr" - "github.com/versity/versitygw/auth" "github.com/versity/versitygw/backend" - "github.com/versity/versitygw/backend/meta" "github.com/versity/versitygw/backend/posix" - "github.com/versity/versitygw/s3api/utils" "github.com/versity/versitygw/s3err" "github.com/versity/versitygw/s3response" ) @@ -53,9 +49,6 @@ type ScoutFS struct { rootfd *os.File rootdir string - // bucket/object metadata storage facility - meta meta.MetadataStorer - // glaciermode enables the following behavior: // GET object: if file offline, return invalid object state // HEAD object: if file offline, set obj storage class to GLACIER @@ -67,19 +60,6 @@ type ScoutFS struct { // RestoreObject: add batch stage request to file glaciermode bool - // chownuid/gid enable chowning of files to the account uid/gid - // when objects are uploaded - chownuid bool - chowngid bool - - // euid/egid are the effective uid/gid of the running versitygw process - // used to determine if chowning is needed - euid int - egid int - - // newDirPerm is the permissions to use when creating new directories - newDirPerm fs.FileMode - // disableNoArchive is used to disable setting scoutam noarchive flag // on mutlipart parts. This is enabled by default to prevent archive // copies of temporary multipart parts. @@ -89,24 +69,6 @@ type ScoutFS struct { var _ backend.Backend = &ScoutFS{} const ( - metaTmpDir = ".sgwtmp" - metaTmpMultipartDir = metaTmpDir + "/multipart" - tagHdr = "X-Amz-Tagging" - metaHdr = "X-Amz-Meta" - contentTypeHdr = "content-type" - contentEncHdr = "content-encoding" - contentLangHdr = "content-language" - contentDispHdr = "content-disposition" - cacheCtrlHdr = "cache-control" - expiresHdr = "expires" - emptyMD5 = "d41d8cd98f00b204e9800998ecf8427e" - etagkey = "etag" - checksumsKey = "checksums" - objectRetentionKey = "object-retention" - objectLegalHoldKey = "object-legal-hold" -) - -var ( stageComplete = "ongoing-request=\"false\", expiry-date=\"Fri, 2 Dec 2050 00:00:00 GMT\"" stageInProgress = "true" stageNotInProgress = "false" @@ -146,25 +108,6 @@ func (*ScoutFS) String() string { return "ScoutFS Gateway" } -// getChownIDs returns the uid and gid that should be used for chowning -// the object to the account uid/gid. It also returns a boolean indicating -// if chowning is needed. -func (s *ScoutFS) getChownIDs(acct auth.Account) (int, int, bool) { - uid := s.euid - gid := s.egid - var needsChown bool - if s.chownuid && acct.UserID != s.euid { - uid = acct.UserID - needsChown = true - } - if s.chowngid && acct.GroupID != s.egid { - gid = acct.GroupID - needsChown = true - } - - return uid, gid, needsChown -} - func (s *ScoutFS) UploadPart(ctx context.Context, input *s3.UploadPartInput) (*s3.UploadPartOutput, error) { out, err := s.Posix.UploadPart(ctx, input) if err != nil { @@ -175,7 +118,7 @@ func (s *ScoutFS) UploadPart(ctx context.Context, input *s3.UploadPartInput) (*s sum := sha256.Sum256([]byte(*input.Key)) partPath := filepath.Join( *input.Bucket, // bucket - metaTmpMultipartDir, // temp multipart dir + posix.MetaTmpMultipartDir, // temp multipart dir fmt.Sprintf("%x", sum), // hashed objname *input.UploadId, // upload id fmt.Sprintf("%v", *input.PartNumber), // part number @@ -194,443 +137,7 @@ func (s *ScoutFS) UploadPart(ctx context.Context, input *s3.UploadPartInput) (*s // ioctl to not have to read and copy the part data to the final object. This // saves a read and write cycle for all mutlipart uploads. func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) { - acct, ok := ctx.Value("account").(auth.Account) - if !ok { - acct = auth.Account{} - } - - var res s3response.CompleteMultipartUploadResult - - if input.Key == nil { - return res, "", s3err.GetAPIError(s3err.ErrNoSuchKey) - } - if input.UploadId == nil { - return res, "", s3err.GetAPIError(s3err.ErrNoSuchUpload) - } - if input.MultipartUpload == nil { - return res, "", s3err.GetAPIError(s3err.ErrInvalidRequest) - } - - bucket := *input.Bucket - object := *input.Key - uploadID := *input.UploadId - parts := input.MultipartUpload.Parts - - _, err := os.Stat(bucket) - if errors.Is(err, fs.ErrNotExist) { - return res, "", s3err.GetAPIError(s3err.ErrNoSuchBucket) - } - if err != nil { - return res, "", fmt.Errorf("stat bucket: %w", err) - } - - sum, err := s.checkUploadIDExists(bucket, object, uploadID) - if err != nil { - return res, "", err - } - - objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum)) - - checksums, err := s.retrieveChecksums(nil, bucket, filepath.Join(objdir, uploadID)) - if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { - return res, "", fmt.Errorf("get mp checksums: %w", err) - } - - // ChecksumType should be the same as specified on CreateMultipartUpload - if input.ChecksumType != "" && checksums.Type != input.ChecksumType { - checksumType := checksums.Type - if checksumType == "" { - checksumType = types.ChecksumType("null") - } - - return res, "", s3err.GetChecksumTypeMismatchOnMpErr(checksumType) - } - - // check all parts ok - last := len(parts) - 1 - var totalsize int64 - - // The initialie values is the lower limit of partNumber: 0 - var partNumber int32 - for i, part := range parts { - if part.PartNumber == nil { - return res, "", s3err.GetAPIError(s3err.ErrInvalidPart) - } - if *part.PartNumber < 1 { - return res, "", s3err.GetAPIError(s3err.ErrInvalidCompleteMpPartNumber) - } - if *part.PartNumber <= partNumber { - return res, "", s3err.GetAPIError(s3err.ErrInvalidPartOrder) - } - - partNumber = *part.PartNumber - - partObjPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *part.PartNumber)) - fullPartPath := filepath.Join(bucket, partObjPath) - fi, err := os.Lstat(fullPartPath) - if err != nil { - return res, "", s3err.GetAPIError(s3err.ErrInvalidPart) - } - - totalsize += fi.Size() - // all parts except the last need to be greater, thena - // the minimum allowed size (5 Mib) - if i < last && fi.Size() < backend.MinPartSize { - return res, "", s3err.GetAPIError(s3err.ErrEntityTooSmall) - } - - b, err := s.meta.RetrieveAttribute(nil, bucket, partObjPath, etagkey) - etag := string(b) - if err != nil { - etag = "" - } - if parts[i].ETag == nil || !backend.AreEtagsSame(etag, *parts[i].ETag) { - return res, "", s3err.GetAPIError(s3err.ErrInvalidPart) - } - - partChecksum, err := s.retrieveChecksums(nil, bucket, partObjPath) - if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { - return res, "", fmt.Errorf("get part checksum: %w", err) - } - - // If checksum has been provided on mp initalization - err = validatePartChecksum(partChecksum, part) - if err != nil { - return res, "", err - } - } - - if input.MpuObjectSize != nil && totalsize != *input.MpuObjectSize { - return res, "", s3err.GetIncorrectMpObjectSizeErr(totalsize, *input.MpuObjectSize) - } - - // use totalsize=0 because we wont be writing to the file, only moving - // extents around. so we dont want to fallocate this. - f, err := s.openTmpFile(filepath.Join(bucket, metaTmpDir), bucket, object, 0, acct) - if err != nil { - if errors.Is(err, syscall.EDQUOT) { - return res, "", s3err.GetAPIError(s3err.ErrQuotaExceeded) - } - return res, "", fmt.Errorf("open temp file: %w", err) - } - defer f.cleanup() - - for _, part := range parts { - if part.PartNumber == nil || *part.PartNumber < 1 { - return res, "", s3err.GetAPIError(s3err.ErrInvalidPart) - } - - partObjPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *part.PartNumber)) - fullPartPath := filepath.Join(bucket, partObjPath) - pf, err := os.Open(fullPartPath) - if err != nil { - return res, "", fmt.Errorf("open part %v: %v", *part.PartNumber, err) - } - - // scoutfs move data is a metadata only operation that moves the data - // extent references from the source, appeding to the destination. - // this needs to be 4k aligned. - err = moveData(pf, f.File()) - pf.Close() - if err != nil { - return res, "", fmt.Errorf("move blocks part %v: %v", *part.PartNumber, err) - } - } - - userMetaData := make(map[string]string) - upiddir := filepath.Join(objdir, uploadID) - objMeta := s.loadUserMetaData(bucket, upiddir, userMetaData) - err = s.storeObjectMetadata(f.File(), bucket, object, objMeta) - if err != nil { - return res, "", err - } - - objname := filepath.Join(bucket, object) - dir := filepath.Dir(objname) - if dir != "" { - uid, gid, doChown := s.getChownIDs(acct) - err = backend.MkdirAll(dir, uid, gid, doChown, s.newDirPerm) - if err != nil { - return res, "", err - } - } - - for k, v := range userMetaData { - err = s.meta.StoreAttribute(f.File(), bucket, object, fmt.Sprintf("%v.%v", metaHdr, k), []byte(v)) - if err != nil { - return res, "", fmt.Errorf("set user attr %q: %w", k, err) - } - } - - // load and set tagging - tagging, err := s.meta.RetrieveAttribute(nil, bucket, upiddir, tagHdr) - if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { - return res, "", fmt.Errorf("get object tagging: %w", err) - } - if err == nil { - err := s.meta.StoreAttribute(f.File(), bucket, object, tagHdr, tagging) - if err != nil { - return res, "", fmt.Errorf("set object tagging: %w", err) - } - } - - // load and set legal hold - lHold, err := s.meta.RetrieveAttribute(nil, bucket, upiddir, objectLegalHoldKey) - if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { - return res, "", fmt.Errorf("get object legal hold: %w", err) - } - if err == nil { - err := s.meta.StoreAttribute(f.File(), bucket, object, objectLegalHoldKey, lHold) - if err != nil { - return res, "", fmt.Errorf("set object legal hold: %w", err) - } - } - - // load and set retention - ret, err := s.meta.RetrieveAttribute(nil, bucket, upiddir, objectRetentionKey) - if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { - return res, "", fmt.Errorf("get object retention: %w", err) - } - if err == nil { - err := s.meta.StoreAttribute(f.File(), bucket, object, objectRetentionKey, ret) - if err != nil { - return res, "", fmt.Errorf("set object retention: %w", err) - } - } - - // Calculate s3 compatible md5sum for complete multipart. - s3MD5 := backend.GetMultipartMD5(parts) - - err = s.meta.StoreAttribute(f.File(), bucket, object, etagkey, []byte(s3MD5)) - if err != nil { - return res, "", fmt.Errorf("set etag attr: %w", err) - } - - err = f.link() - if err != nil { - return res, "", fmt.Errorf("link object in namespace: %w", err) - } - - // cleanup tmp dirs - os.RemoveAll(filepath.Join(bucket, upiddir)) - // use Remove for objdir in case there are still other uploads - // for same object name outstanding - os.Remove(filepath.Join(bucket, objdir)) - - return s3response.CompleteMultipartUploadResult{ - Bucket: &bucket, - ETag: &s3MD5, - Key: &object, - }, "", nil -} - -func (s *ScoutFS) storeObjectMetadata(f *os.File, bucket, object string, m objectMetadata) error { - if getString(m.ContentType) != "" { - err := s.meta.StoreAttribute(f, bucket, object, contentTypeHdr, []byte(*m.ContentType)) - if err != nil { - return fmt.Errorf("set content-type: %w", err) - } - } - if getString(m.ContentEncoding) != "" { - err := s.meta.StoreAttribute(f, bucket, object, contentEncHdr, []byte(*m.ContentEncoding)) - if err != nil { - return fmt.Errorf("set content-encoding: %w", err) - } - } - if getString(m.ContentDisposition) != "" { - err := s.meta.StoreAttribute(f, bucket, object, contentDispHdr, []byte(*m.ContentDisposition)) - if err != nil { - return fmt.Errorf("set content-disposition: %w", err) - } - } - if getString(m.ContentLanguage) != "" { - err := s.meta.StoreAttribute(f, bucket, object, contentLangHdr, []byte(*m.ContentLanguage)) - if err != nil { - return fmt.Errorf("set content-language: %w", err) - } - } - if getString(m.CacheControl) != "" { - err := s.meta.StoreAttribute(f, bucket, object, cacheCtrlHdr, []byte(*m.CacheControl)) - if err != nil { - return fmt.Errorf("set cache-control: %w", err) - } - } - if getString(m.Expires) != "" { - err := s.meta.StoreAttribute(f, bucket, object, expiresHdr, []byte(*m.Expires)) - if err != nil { - return fmt.Errorf("set cache-control: %w", err) - } - } - - return nil -} - -func validatePartChecksum(checksum s3response.Checksum, part types.CompletedPart) error { - n := numberOfChecksums(part) - if n > 1 { - return s3err.GetAPIError(s3err.ErrInvalidChecksumPart) - } - if checksum.Algorithm == "" { - if n != 0 { - return s3err.GetAPIError(s3err.ErrInvalidPart) - } - - return nil - } - - algo := checksum.Algorithm - if n == 0 { - return s3err.APIError{ - Code: "InvalidRequest", - Description: fmt.Sprintf("The upload was created using a %v checksum. The complete request must include the checksum for each part. It was missing for part %v in the request.", strings.ToLower(string(algo)), *part.PartNumber), - HTTPStatusCode: http.StatusBadRequest, - } - } - - for _, cs := range []struct { - checksum *string - expectedChecksum string - algo types.ChecksumAlgorithm - }{ - {part.ChecksumCRC32, getString(checksum.CRC32), types.ChecksumAlgorithmCrc32}, - {part.ChecksumCRC32C, getString(checksum.CRC32C), types.ChecksumAlgorithmCrc32c}, - {part.ChecksumSHA1, getString(checksum.SHA1), types.ChecksumAlgorithmSha1}, - {part.ChecksumSHA256, getString(checksum.SHA256), types.ChecksumAlgorithmSha256}, - {part.ChecksumCRC64NVME, getString(checksum.CRC64NVME), types.ChecksumAlgorithmCrc64nvme}, - } { - if cs.checksum == nil { - continue - } - - if !utils.IsValidChecksum(*cs.checksum, cs.algo) { - return s3err.GetAPIError(s3err.ErrInvalidChecksumPart) - } - - if *cs.checksum != cs.expectedChecksum { - if algo == cs.algo { - return s3err.GetAPIError(s3err.ErrInvalidPart) - } - - return s3err.APIError{ - Code: "BadDigest", - Description: fmt.Sprintf("The %v you specified for part %v did not match what we received.", strings.ToLower(string(cs.algo)), *part.PartNumber), - HTTPStatusCode: http.StatusBadRequest, - } - } - } - - return nil -} - -func numberOfChecksums(part types.CompletedPart) int { - counter := 0 - if getString(part.ChecksumCRC32) != "" { - counter++ - } - if getString(part.ChecksumCRC32C) != "" { - counter++ - } - if getString(part.ChecksumSHA1) != "" { - counter++ - } - if getString(part.ChecksumSHA256) != "" { - counter++ - } - if getString(part.ChecksumCRC64NVME) != "" { - counter++ - } - - return counter -} - -func (s *ScoutFS) checkUploadIDExists(bucket, object, uploadID string) ([32]byte, error) { - sum := sha256.Sum256([]byte(object)) - objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum)) - - _, err := os.Stat(filepath.Join(objdir, uploadID)) - if errors.Is(err, fs.ErrNotExist) { - return [32]byte{}, s3err.GetAPIError(s3err.ErrNoSuchUpload) - } - if err != nil { - return [32]byte{}, fmt.Errorf("stat upload: %w", err) - } - return sum, nil -} - -type objectMetadata struct { - ContentType *string - ContentEncoding *string - ContentDisposition *string - ContentLanguage *string - CacheControl *string - Expires *string -} - -// fll out the user metadata map with the metadata for the object -// and return the content type and encoding -func (s *ScoutFS) loadUserMetaData(bucket, object string, m map[string]string) objectMetadata { - ents, err := s.meta.ListAttributes(bucket, object) - if err != nil || len(ents) == 0 { - return objectMetadata{} - } - for _, e := range ents { - if !isValidMeta(e) { - continue - } - b, err := s.meta.RetrieveAttribute(nil, bucket, object, e) - if err != nil { - continue - } - if b == nil { - m[strings.TrimPrefix(e, fmt.Sprintf("%v.", metaHdr))] = "" - continue - } - m[strings.TrimPrefix(e, fmt.Sprintf("%v.", metaHdr))] = string(b) - } - - var result objectMetadata - - b, err := s.meta.RetrieveAttribute(nil, bucket, object, contentTypeHdr) - if err == nil { - result.ContentType = backend.GetPtrFromString(string(b)) - } - - b, err = s.meta.RetrieveAttribute(nil, bucket, object, contentEncHdr) - if err == nil { - result.ContentEncoding = backend.GetPtrFromString(string(b)) - } - - b, err = s.meta.RetrieveAttribute(nil, bucket, object, contentDispHdr) - if err == nil { - result.ContentDisposition = backend.GetPtrFromString(string(b)) - } - - b, err = s.meta.RetrieveAttribute(nil, bucket, object, contentLangHdr) - if err == nil { - result.ContentLanguage = backend.GetPtrFromString(string(b)) - } - - b, err = s.meta.RetrieveAttribute(nil, bucket, object, cacheCtrlHdr) - if err == nil { - result.CacheControl = backend.GetPtrFromString(string(b)) - } - - b, err = s.meta.RetrieveAttribute(nil, bucket, object, expiresHdr) - if err == nil { - result.Expires = backend.GetPtrFromString(string(b)) - } - - return result -} - -func isValidMeta(val string) bool { - if strings.HasPrefix(val, metaHdr) { - return true - } - if strings.EqualFold(val, "Expires") { - return true - } - return false + return s.Posix.CompleteMultipartUploadWithCopy(ctx, input, moveData) } func (s *ScoutFS) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) { @@ -727,210 +234,47 @@ func (s *ScoutFS) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3. } func (s *ScoutFS) ListObjects(ctx context.Context, input *s3.ListObjectsInput) (s3response.ListObjectsResult, error) { - bucket := *input.Bucket - prefix := "" - if input.Prefix != nil { - prefix = *input.Prefix + if s.glaciermode { + return s.Posix.ListObjectsParametrized(ctx, input, s.glacierFileToObj) + } else { + return s.Posix.ListObjects(ctx, input) } - marker := "" - if input.Marker != nil { - marker = *input.Marker - } - delim := "" - if input.Delimiter != nil { - delim = *input.Delimiter - } - maxkeys := int32(0) - if input.MaxKeys != nil { - maxkeys = *input.MaxKeys - } - - _, err := os.Stat(bucket) - if errors.Is(err, fs.ErrNotExist) { - return s3response.ListObjectsResult{}, s3err.GetAPIError(s3err.ErrNoSuchBucket) - } - if err != nil { - return s3response.ListObjectsResult{}, fmt.Errorf("stat bucket: %w", err) - } - - fileSystem := os.DirFS(bucket) - results, err := backend.Walk(ctx, fileSystem, prefix, delim, marker, maxkeys, - s.fileToObj(bucket), []string{metaTmpDir}) - if err != nil { - return s3response.ListObjectsResult{}, fmt.Errorf("walk %v: %w", bucket, err) - } - - return s3response.ListObjectsResult{ - CommonPrefixes: results.CommonPrefixes, - Contents: results.Objects, - Delimiter: backend.GetPtrFromString(delim), - Marker: backend.GetPtrFromString(marker), - NextMarker: backend.GetPtrFromString(results.NextMarker), - Prefix: backend.GetPtrFromString(prefix), - IsTruncated: &results.Truncated, - MaxKeys: &maxkeys, - Name: &bucket, - }, nil } func (s *ScoutFS) ListObjectsV2(ctx context.Context, input *s3.ListObjectsV2Input) (s3response.ListObjectsV2Result, error) { - bucket := *input.Bucket - prefix := "" - if input.Prefix != nil { - prefix = *input.Prefix + if s.glaciermode { + return s.Posix.ListObjectsV2Parametrized(ctx, input, s.glacierFileToObj) + } else { + return s.Posix.ListObjectsV2(ctx, input) } - marker := "" - if input.ContinuationToken != nil { - if input.StartAfter != nil { - marker = max(*input.StartAfter, *input.ContinuationToken) - } else { - marker = *input.ContinuationToken - } - } - delim := "" - if input.Delimiter != nil { - delim = *input.Delimiter - } - maxkeys := int32(0) - if input.MaxKeys != nil { - maxkeys = *input.MaxKeys - } - - _, err := os.Stat(bucket) - if errors.Is(err, fs.ErrNotExist) { - return s3response.ListObjectsV2Result{}, s3err.GetAPIError(s3err.ErrNoSuchBucket) - } - if err != nil { - return s3response.ListObjectsV2Result{}, fmt.Errorf("stat bucket: %w", err) - } - - fileSystem := os.DirFS(bucket) - results, err := backend.Walk(ctx, fileSystem, prefix, delim, marker, int32(maxkeys), - s.fileToObj(bucket), []string{metaTmpDir}) - if err != nil { - return s3response.ListObjectsV2Result{}, fmt.Errorf("walk %v: %w", bucket, err) - } - - count := int32(len(results.Objects)) - - return s3response.ListObjectsV2Result{ - CommonPrefixes: results.CommonPrefixes, - Contents: results.Objects, - IsTruncated: &results.Truncated, - MaxKeys: &maxkeys, - Name: &bucket, - KeyCount: &count, - Delimiter: backend.GetPtrFromString(delim), - ContinuationToken: backend.GetPtrFromString(marker), - NextContinuationToken: backend.GetPtrFromString(results.NextMarker), - Prefix: backend.GetPtrFromString(prefix), - StartAfter: backend.GetPtrFromString(*input.StartAfter), - }, nil } -func (s *ScoutFS) fileToObj(bucket string) backend.GetObjFunc { +// FileToObj function for ListObject calls that adds a Glacier storage class if the file is offline +func (s *ScoutFS) glacierFileToObj(bucket string, fetchOwner bool) backend.GetObjFunc { + posixFileToObj := s.Posix.FileToObj(bucket, fetchOwner) + return func(path string, d fs.DirEntry) (s3response.Object, error) { + res, err := posixFileToObj(path, d) + if err != nil || d.IsDir() { + return res, err + } objPath := filepath.Join(bucket, path) - if d.IsDir() { - // directory object only happens if directory empty - // check to see if this is a directory object by checking etag - etagBytes, err := s.meta.RetrieveAttribute(nil, bucket, path, etagkey) - if errors.Is(err, meta.ErrNoSuchKey) || errors.Is(err, fs.ErrNotExist) { - return s3response.Object{}, backend.ErrSkipObj - } - if err != nil { - return s3response.Object{}, fmt.Errorf("get etag: %w", err) - } - etag := string(etagBytes) - - fi, err := d.Info() - if errors.Is(err, fs.ErrNotExist) { - return s3response.Object{}, backend.ErrSkipObj - } - if err != nil { - return s3response.Object{}, fmt.Errorf("get fileinfo: %w", err) - } - - size := int64(0) - mtime := fi.ModTime() - - return s3response.Object{ - ETag: &etag, - Key: &path, - LastModified: &mtime, - Size: &size, - StorageClass: types.ObjectStorageClassStandard, - }, nil - } - - // Retreive the object checksum algorithm - checksums, err := s.retrieveChecksums(nil, bucket, path) - if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { - return s3response.Object{}, backend.ErrSkipObj - } - - // file object, get object info and fill out object data - b, err := s.meta.RetrieveAttribute(nil, bucket, path, etagkey) - if errors.Is(err, fs.ErrNotExist) { - return s3response.Object{}, backend.ErrSkipObj - } - if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { - return s3response.Object{}, fmt.Errorf("get etag: %w", err) - } - // note: meta.ErrNoSuchKey will return etagBytes = []byte{} - // so this will just set etag to "" if its not already set - - etag := string(b) - - fi, err := d.Info() + // Check if there are any offline exents associated with this file. + // If so, we will return the Glacier storage class + st, err := statMore(objPath) if errors.Is(err, fs.ErrNotExist) { return s3response.Object{}, backend.ErrSkipObj } if err != nil { - return s3response.Object{}, fmt.Errorf("get fileinfo: %w", err) + return s3response.Object{}, fmt.Errorf("stat more: %w", err) } - - sc := types.ObjectStorageClassStandard - if s.glaciermode { - // Check if there are any offline exents associated with this file. - // If so, we will return the InvalidObjectState error. - st, err := statMore(objPath) - if errors.Is(err, fs.ErrNotExist) { - return s3response.Object{}, backend.ErrSkipObj - } - if err != nil { - return s3response.Object{}, fmt.Errorf("stat more: %w", err) - } - if st.Offline_blocks != 0 { - sc = types.ObjectStorageClassGlacier - } + if st.Offline_blocks != 0 { + res.StorageClass = types.ObjectStorageClassGlacier } - - size := fi.Size() - mtime := fi.ModTime() - - return s3response.Object{ - ETag: &etag, - Key: &path, - LastModified: &mtime, - Size: &size, - StorageClass: sc, - ChecksumAlgorithm: []types.ChecksumAlgorithm{checksums.Algorithm}, - ChecksumType: checksums.Type, - }, nil + return res, nil } } -func (s *ScoutFS) retrieveChecksums(f *os.File, bucket, object string) (checksums s3response.Checksum, err error) { - checksumsAtr, err := s.meta.RetrieveAttribute(f, bucket, object, checksumsKey) - if err != nil { - return checksums, err - } - - err = json.Unmarshal(checksumsAtr, &checksums) - return checksums, err -} - // RestoreObject will set stage request on file if offline and do nothing if // file is online func (s *ScoutFS) RestoreObject(_ context.Context, input *s3.RestoreObjectInput) error { @@ -956,13 +300,6 @@ func (s *ScoutFS) RestoreObject(_ context.Context, input *s3.RestoreObjectInput) return nil } -func getString(str *string) string { - if str == nil { - return "" - } - return *str -} - func isStaging(objname string) (bool, error) { b, err := xattr.Get(objname, flagskey) if err != nil && !isNoAttr(err) { diff --git a/backend/scoutfs/scoutfs_compat.go b/backend/scoutfs/scoutfs_compat.go index 0a590ba..4573a7b 100644 --- a/backend/scoutfs/scoutfs_compat.go +++ b/backend/scoutfs/scoutfs_compat.go @@ -17,22 +17,13 @@ package scoutfs import ( - "errors" "fmt" - "io/fs" "os" - "path/filepath" - "strconv" - "syscall" - - "golang.org/x/sys/unix" "github.com/versity/scoutfs-go" - "github.com/versity/versitygw/auth" - "github.com/versity/versitygw/backend" "github.com/versity/versitygw/backend/meta" "github.com/versity/versitygw/backend/posix" - "github.com/versity/versitygw/s3err" + "github.com/versity/versitygw/s3api/debuglogger" ) func New(rootdir string, opts ScoutfsOpts) (*ScoutFS, error) { @@ -57,148 +48,31 @@ func New(rootdir string, opts ScoutfsOpts) (*ScoutFS, error) { Posix: p, rootfd: f, rootdir: rootdir, - meta: metastore, - chownuid: opts.ChownUID, - chowngid: opts.ChownGID, glaciermode: opts.GlacierMode, - newDirPerm: opts.NewDirPerm, disableNoArchive: opts.DisableNoArchive, }, nil } -const procfddir = "/proc/self/fd" - -type tmpfile struct { - f *os.File - bucket string - objname string - size int64 - needsChown bool - uid int - gid int - newDirPerm fs.FileMode -} - -var ( - defaultFilePerm uint32 = 0644 -) - -func (s *ScoutFS) openTmpFile(dir, bucket, obj string, size int64, acct auth.Account) (*tmpfile, error) { - uid, gid, doChown := s.getChownIDs(acct) - - // O_TMPFILE allows for a file handle to an unnamed file in the filesystem. - // This can help reduce contention within the namespace (parent directories), - // etc. And will auto cleanup the inode on close if we never link this - // file descriptor into the namespace. - fd, err := unix.Open(dir, unix.O_RDWR|unix.O_TMPFILE|unix.O_CLOEXEC, defaultFilePerm) - if err != nil { - if errors.Is(err, syscall.EROFS) { - return nil, s3err.GetAPIError(s3err.ErrMethodNotAllowed) - } - return nil, err - } - - // for O_TMPFILE, filename is /proc/self/fd/ to be used - // later to link file into namespace - f := os.NewFile(uintptr(fd), filepath.Join(procfddir, strconv.Itoa(fd))) - - tmp := &tmpfile{ - f: f, - bucket: bucket, - objname: obj, - size: size, - needsChown: doChown, - uid: uid, - gid: gid, - newDirPerm: s.newDirPerm, - } - - if doChown { - err := f.Chown(uid, gid) - if err != nil { - return nil, fmt.Errorf("set temp file ownership: %w", err) - } - } - - return tmp, nil -} - -func (tmp *tmpfile) link() error { - // We use Linkat/Rename as the atomic operation for object puts. The - // upload is written to a temp (or unnamed/O_TMPFILE) file to not conflict - // with any other simultaneous uploads. The final operation is to move the - // temp file into place for the object. This ensures the object semantics - // of last upload completed wins and is not some combination of writes - // from simultaneous uploads. - objPath := filepath.Join(tmp.bucket, tmp.objname) - err := os.Remove(objPath) - if err != nil && !errors.Is(err, fs.ErrNotExist) { - return fmt.Errorf("remove stale path: %w", err) - } - - dir := filepath.Dir(objPath) - - err = backend.MkdirAll(dir, tmp.uid, tmp.gid, tmp.needsChown, tmp.newDirPerm) - if err != nil { - return fmt.Errorf("make parent dir: %w", err) - } - - procdir, err := os.Open(procfddir) - if err != nil { - return fmt.Errorf("open proc dir: %w", err) - } - defer procdir.Close() - - dirf, err := os.Open(dir) - if err != nil { - return fmt.Errorf("open parent dir: %w", err) - } - defer dirf.Close() - - for { - err = unix.Linkat(int(procdir.Fd()), filepath.Base(tmp.f.Name()), - int(dirf.Fd()), filepath.Base(objPath), unix.AT_SYMLINK_FOLLOW) - if errors.Is(err, fs.ErrExist) { - err := os.Remove(objPath) - if err != nil && !errors.Is(err, fs.ErrNotExist) { - return fmt.Errorf("remove stale path: %w", err) - } - continue - } - if err != nil { - return fmt.Errorf("link tmpfile: %w", err) - } - break - } - - err = tmp.f.Close() - if err != nil { - return fmt.Errorf("close tmpfile: %w", err) - } - - return nil -} - -func (tmp *tmpfile) Write(b []byte) (int, error) { - if int64(len(b)) > tmp.size { - return 0, fmt.Errorf("write exceeds content length %v", tmp.size) - } - - n, err := tmp.f.Write(b) - tmp.size -= int64(n) - return n, err -} - -func (tmp *tmpfile) cleanup() { - tmp.f.Close() -} - -func (tmp *tmpfile) File() *os.File { - return tmp.f -} - func moveData(from *os.File, to *os.File) error { - return scoutfs.MoveData(from, to) + // May fail if the files are not 4K aligned; check for alignment + ffi, err := from.Stat() + if err != nil { + return fmt.Errorf("stat from: %v", err) + } + tfi, err := to.Stat() + if err != nil { + return fmt.Errorf("stat to: %v", err) + } + if ffi.Size()%4096 != 0 || tfi.Size()%4096 != 0 { + return os.ErrInvalid + } + + // + err = scoutfs.MoveData(from, to) + if err != nil { + debuglogger.Logf("ScoutFs MoveData failed: %v", err) + } + return err } func statMore(path string) (stat, error) { diff --git a/backend/scoutfs/scoutfs_incompat.go b/backend/scoutfs/scoutfs_incompat.go index 8d3f09c..280c846 100644 --- a/backend/scoutfs/scoutfs_incompat.go +++ b/backend/scoutfs/scoutfs_incompat.go @@ -20,44 +20,16 @@ import ( "errors" "fmt" "os" - - "github.com/versity/versitygw/auth" ) func New(rootdir string, opts ScoutfsOpts) (*ScoutFS, error) { return nil, fmt.Errorf("scoutfs only available on linux") } -type tmpfile struct{} - var ( errNotSupported = errors.New("not supported") ) -func (s *ScoutFS) openTmpFile(_, _, _ string, _ int64, _ auth.Account) (*tmpfile, error) { - // make these look used for static check - _ = s.chownuid - _ = s.chowngid - _ = s.euid - _ = s.egid - return nil, errNotSupported -} - -func (tmp *tmpfile) link() error { - return errNotSupported -} - -func (tmp *tmpfile) Write(b []byte) (int, error) { - return 0, errNotSupported -} - -func (tmp *tmpfile) cleanup() { -} - -func (tmp *tmpfile) File() *os.File { - return nil -} - func moveData(_, _ *os.File) error { return errNotSupported }