mirror of
https://github.com/versity/versitygw.git
synced 2025-12-23 13:15:18 +00:00
Merge pull request #1435 from ondrap/pr2
Fix O_TMPFILE Linkat race, cleanup of scoutfs integration, fix MoveData non-aligned problem
This commit is contained in:
@@ -39,6 +39,7 @@ import (
|
||||
"github.com/versity/versitygw/auth"
|
||||
"github.com/versity/versitygw/backend"
|
||||
"github.com/versity/versitygw/backend/meta"
|
||||
"github.com/versity/versitygw/s3api/debuglogger"
|
||||
"github.com/versity/versitygw/s3api/utils"
|
||||
"github.com/versity/versitygw/s3err"
|
||||
"github.com/versity/versitygw/s3response"
|
||||
@@ -82,8 +83,8 @@ type Posix struct {
|
||||
var _ backend.Backend = &Posix{}
|
||||
|
||||
const (
|
||||
metaTmpDir = ".sgwtmp"
|
||||
metaTmpMultipartDir = metaTmpDir + "/multipart"
|
||||
MetaTmpDir = ".sgwtmp"
|
||||
MetaTmpMultipartDir = MetaTmpDir + "/multipart"
|
||||
onameAttr = "objname"
|
||||
tagHdr = "X-Amz-Tagging"
|
||||
metaHdr = "X-Amz-Meta"
|
||||
@@ -432,7 +433,7 @@ func (p *Posix) isBucketEmpty(bucket string) error {
|
||||
return fmt.Errorf("readdir bucket: %w", err)
|
||||
}
|
||||
if err == nil {
|
||||
if len(ents) == 1 && ents[0].Name() != metaTmpDir {
|
||||
if len(ents) == 1 && ents[0].Name() != MetaTmpDir {
|
||||
return s3err.GetAPIError(s3err.ErrVersionedBucketNotEmpty)
|
||||
} else if len(ents) > 1 {
|
||||
return s3err.GetAPIError(s3err.ErrVersionedBucketNotEmpty)
|
||||
@@ -447,7 +448,7 @@ func (p *Posix) isBucketEmpty(bucket string) error {
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
return s3err.GetAPIError(s3err.ErrNoSuchBucket)
|
||||
}
|
||||
if len(ents) == 1 && ents[0].Name() != metaTmpDir {
|
||||
if len(ents) == 1 && ents[0].Name() != MetaTmpDir {
|
||||
return s3err.GetAPIError(s3err.ErrBucketNotEmpty)
|
||||
} else if len(ents) > 1 {
|
||||
return s3err.GetAPIError(s3err.ErrBucketNotEmpty)
|
||||
@@ -693,7 +694,7 @@ func (p *Posix) createObjVersion(bucket, key string, size int64, acc auth.Accoun
|
||||
|
||||
versionBucketPath := filepath.Join(p.versioningDir, bucket)
|
||||
versioningKey := filepath.Join(genObjVersionKey(key), versionId)
|
||||
versionTmpPath := filepath.Join(versionBucketPath, metaTmpDir)
|
||||
versionTmpPath := filepath.Join(versionBucketPath, MetaTmpDir)
|
||||
f, err := p.openTmpFile(versionTmpPath, versionBucketPath, versioningKey,
|
||||
size, acc, doFalloc, p.forceNoTmpFile)
|
||||
if err != nil {
|
||||
@@ -764,7 +765,7 @@ func (p *Posix) ListObjectVersions(ctx context.Context, input *s3.ListObjectVers
|
||||
|
||||
fileSystem := os.DirFS(bucket)
|
||||
results, err := backend.WalkVersions(ctx, fileSystem, prefix, delim, keyMarker, versionIdMarker, max,
|
||||
p.fileToObjVersions(bucket), []string{metaTmpDir})
|
||||
p.fileToObjVersions(bucket), []string{MetaTmpDir})
|
||||
if err != nil {
|
||||
return s3response.ListVersionsResult{}, fmt.Errorf("walk %v: %w", bucket, err)
|
||||
}
|
||||
@@ -1210,7 +1211,7 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu s3response.Create
|
||||
objNameSum := sha256.Sum256([]byte(*mpu.Key))
|
||||
// multiple uploads for same object name allowed,
|
||||
// they will all go into the same hashed name directory
|
||||
objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", objNameSum))
|
||||
objdir := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", objNameSum))
|
||||
tmppath := filepath.Join(bucket, objdir)
|
||||
// the unique upload id is a directory for all of the parts
|
||||
// associated with this specific multipart upload
|
||||
@@ -1360,6 +1361,10 @@ func getPartChecksum(algo types.ChecksumAlgorithm, part types.CompletedPart) str
|
||||
}
|
||||
|
||||
func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
|
||||
return p.CompleteMultipartUploadWithCopy(ctx, input, nil)
|
||||
}
|
||||
|
||||
func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.CompleteMultipartUploadInput, customMove func(from *os.File, to *os.File) error) (s3response.CompleteMultipartUploadResult, string, error) {
|
||||
acct, ok := ctx.Value("account").(auth.Account)
|
||||
if !ok {
|
||||
acct = auth.Account{}
|
||||
@@ -1395,7 +1400,7 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
|
||||
return res, "", err
|
||||
}
|
||||
|
||||
objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum))
|
||||
objdir := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", sum))
|
||||
|
||||
checksums, err := p.retrieveChecksums(nil, bucket, filepath.Join(objdir, uploadID))
|
||||
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
|
||||
@@ -1497,7 +1502,7 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
|
||||
}
|
||||
}
|
||||
|
||||
f, err := p.openTmpFile(filepath.Join(bucket, metaTmpDir), bucket, object,
|
||||
f, err := p.openTmpFile(filepath.Join(bucket, MetaTmpDir), bucket, object,
|
||||
totalsize, acct, skipFalloc, p.forceNoTmpFile)
|
||||
if err != nil {
|
||||
if errors.Is(err, syscall.EDQUOT) {
|
||||
@@ -1550,7 +1555,16 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
|
||||
}
|
||||
}
|
||||
|
||||
if customMove != nil {
|
||||
err = customMove(pf, f.File())
|
||||
if err != nil {
|
||||
// Fail back to standard copy
|
||||
debuglogger.Logf("Custom data block move failed (%w), failing back to io.Copy()", err)
|
||||
_, err = io.Copy(f.File(), rdr)
|
||||
}
|
||||
} else {
|
||||
_, err = io.Copy(f.File(), rdr)
|
||||
}
|
||||
pf.Close()
|
||||
if err != nil {
|
||||
if errors.Is(err, syscall.EDQUOT) {
|
||||
@@ -1824,7 +1838,7 @@ func numberOfChecksums(part types.CompletedPart) int {
|
||||
|
||||
func (p *Posix) checkUploadIDExists(bucket, object, uploadID string) ([32]byte, error) {
|
||||
sum := sha256.Sum256([]byte(object))
|
||||
objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum))
|
||||
objdir := filepath.Join(bucket, MetaTmpMultipartDir, fmt.Sprintf("%x", sum))
|
||||
|
||||
_, err := os.Stat(filepath.Join(objdir, uploadID))
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
@@ -1838,7 +1852,7 @@ func (p *Posix) checkUploadIDExists(bucket, object, uploadID string) ([32]byte,
|
||||
|
||||
func (p *Posix) retrieveUploadId(bucket, object string) (string, [32]byte, error) {
|
||||
sum := sha256.Sum256([]byte(object))
|
||||
objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum))
|
||||
objdir := filepath.Join(bucket, MetaTmpMultipartDir, fmt.Sprintf("%x", sum))
|
||||
|
||||
entries, err := os.ReadDir(objdir)
|
||||
if err != nil || len(entries) == 0 {
|
||||
@@ -1990,7 +2004,7 @@ func (p *Posix) AbortMultipartUpload(_ context.Context, mpu *s3.AbortMultipartUp
|
||||
}
|
||||
|
||||
sum := sha256.Sum256([]byte(object))
|
||||
objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum))
|
||||
objdir := filepath.Join(bucket, MetaTmpMultipartDir, fmt.Sprintf("%x", sum))
|
||||
|
||||
_, err = os.Stat(filepath.Join(objdir, uploadID))
|
||||
if err != nil {
|
||||
@@ -2028,7 +2042,7 @@ func (p *Posix) ListMultipartUploads(_ context.Context, mpu *s3.ListMultipartUpl
|
||||
}
|
||||
|
||||
// ignore readdir error and use the empty list returned
|
||||
objs, _ := os.ReadDir(filepath.Join(bucket, metaTmpMultipartDir))
|
||||
objs, _ := os.ReadDir(filepath.Join(bucket, MetaTmpMultipartDir))
|
||||
|
||||
var uploads []s3response.Upload
|
||||
var resultUpds []s3response.Upload
|
||||
@@ -2048,7 +2062,7 @@ func (p *Posix) ListMultipartUploads(_ context.Context, mpu *s3.ListMultipartUpl
|
||||
continue
|
||||
}
|
||||
|
||||
b, err := p.meta.RetrieveAttribute(nil, bucket, filepath.Join(metaTmpMultipartDir, obj.Name()), onameAttr)
|
||||
b, err := p.meta.RetrieveAttribute(nil, bucket, filepath.Join(MetaTmpMultipartDir, obj.Name()), onameAttr)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
@@ -2057,7 +2071,7 @@ func (p *Posix) ListMultipartUploads(_ context.Context, mpu *s3.ListMultipartUpl
|
||||
continue
|
||||
}
|
||||
|
||||
upids, err := os.ReadDir(filepath.Join(bucket, metaTmpMultipartDir, obj.Name()))
|
||||
upids, err := os.ReadDir(filepath.Join(bucket, MetaTmpMultipartDir, obj.Name()))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
@@ -2084,7 +2098,7 @@ func (p *Posix) ListMultipartUploads(_ context.Context, mpu *s3.ListMultipartUpl
|
||||
keyMarkerInd = len(uploads)
|
||||
}
|
||||
|
||||
checksum, err := p.retrieveChecksums(nil, bucket, filepath.Join(metaTmpMultipartDir, obj.Name(), uploadID))
|
||||
checksum, err := p.retrieveChecksums(nil, bucket, filepath.Join(MetaTmpMultipartDir, obj.Name(), uploadID))
|
||||
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
|
||||
return lmu, fmt.Errorf("get mp checksum: %w", err)
|
||||
}
|
||||
@@ -2200,7 +2214,7 @@ func (p *Posix) ListParts(ctx context.Context, input *s3.ListPartsInput) (s3resp
|
||||
return lpr, err
|
||||
}
|
||||
|
||||
objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum))
|
||||
objdir := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", sum))
|
||||
tmpdir := filepath.Join(bucket, objdir)
|
||||
|
||||
ents, err := os.ReadDir(filepath.Join(tmpdir, uploadID))
|
||||
@@ -2337,7 +2351,7 @@ func (p *Posix) UploadPart(ctx context.Context, input *s3.UploadPartInput) (*s3.
|
||||
}
|
||||
|
||||
sum := sha256.Sum256([]byte(object))
|
||||
objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum))
|
||||
objdir := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", sum))
|
||||
mpPath := filepath.Join(objdir, uploadID)
|
||||
|
||||
_, err = os.Stat(filepath.Join(bucket, mpPath))
|
||||
@@ -2511,7 +2525,7 @@ func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput)
|
||||
}
|
||||
|
||||
sum := sha256.Sum256([]byte(*upi.Key))
|
||||
objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum))
|
||||
objdir := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", sum))
|
||||
|
||||
_, err = os.Stat(filepath.Join(*upi.Bucket, objdir, *upi.UploadId))
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
@@ -2821,7 +2835,7 @@ func (p *Posix) PutObject(ctx context.Context, po s3response.PutObjectInput) (s3
|
||||
return s3response.PutObjectOutput{}, fmt.Errorf("stat object: %w", err)
|
||||
}
|
||||
|
||||
f, err := p.openTmpFile(filepath.Join(*po.Bucket, metaTmpDir),
|
||||
f, err := p.openTmpFile(filepath.Join(*po.Bucket, MetaTmpDir),
|
||||
*po.Bucket, *po.Key, contentLength, acct, doFalloc, p.forceNoTmpFile)
|
||||
if err != nil {
|
||||
if errors.Is(err, syscall.EDQUOT) {
|
||||
@@ -3175,7 +3189,7 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
|
||||
acct = auth.Account{}
|
||||
}
|
||||
|
||||
f, err := p.openTmpFile(filepath.Join(bucket, metaTmpDir),
|
||||
f, err := p.openTmpFile(filepath.Join(bucket, MetaTmpDir),
|
||||
bucket, object, srcObjVersion.Size(), acct, doFalloc,
|
||||
p.forceNoTmpFile)
|
||||
if err != nil {
|
||||
@@ -3641,7 +3655,7 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ents, err := os.ReadDir(filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum), uploadId))
|
||||
ents, err := os.ReadDir(filepath.Join(bucket, MetaTmpMultipartDir, fmt.Sprintf("%x", sum), uploadId))
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
|
||||
}
|
||||
@@ -3649,7 +3663,7 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.
|
||||
return nil, fmt.Errorf("read parts: %w", err)
|
||||
}
|
||||
|
||||
partPath := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum), uploadId, fmt.Sprintf("%v", *input.PartNumber))
|
||||
partPath := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", sum), uploadId, fmt.Sprintf("%v", *input.PartNumber))
|
||||
|
||||
part, err := os.Stat(filepath.Join(bucket, partPath))
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
@@ -4201,6 +4215,10 @@ func (p *Posix) CopyObject(ctx context.Context, input s3response.CopyObjectInput
|
||||
}
|
||||
|
||||
func (p *Posix) ListObjects(ctx context.Context, input *s3.ListObjectsInput) (s3response.ListObjectsResult, error) {
|
||||
return p.ListObjectsParametrized(ctx, input, p.FileToObj)
|
||||
}
|
||||
|
||||
func (p *Posix) ListObjectsParametrized(ctx context.Context, input *s3.ListObjectsInput, customFileToObj func(string, bool) backend.GetObjFunc) (s3response.ListObjectsResult, error) {
|
||||
bucket := *input.Bucket
|
||||
prefix := ""
|
||||
if input.Prefix != nil {
|
||||
@@ -4229,7 +4247,7 @@ func (p *Posix) ListObjects(ctx context.Context, input *s3.ListObjectsInput) (s3
|
||||
|
||||
fileSystem := os.DirFS(bucket)
|
||||
results, err := backend.Walk(ctx, fileSystem, prefix, delim, marker, maxkeys,
|
||||
p.fileToObj(bucket, true), []string{metaTmpDir})
|
||||
customFileToObj(bucket, true), []string{MetaTmpDir})
|
||||
if err != nil {
|
||||
return s3response.ListObjectsResult{}, fmt.Errorf("walk %v: %w", bucket, err)
|
||||
}
|
||||
@@ -4247,7 +4265,7 @@ func (p *Posix) ListObjects(ctx context.Context, input *s3.ListObjectsInput) (s3
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *Posix) fileToObj(bucket string, fetchOwner bool) backend.GetObjFunc {
|
||||
func (p *Posix) FileToObj(bucket string, fetchOwner bool) backend.GetObjFunc {
|
||||
return func(path string, d fs.DirEntry) (s3response.Object, error) {
|
||||
var owner *types.Owner
|
||||
// Retreive the object owner data from bucket ACL, if fetchOwner is true
|
||||
@@ -4350,6 +4368,10 @@ func (p *Posix) fileToObj(bucket string, fetchOwner bool) backend.GetObjFunc {
|
||||
}
|
||||
|
||||
func (p *Posix) ListObjectsV2(ctx context.Context, input *s3.ListObjectsV2Input) (s3response.ListObjectsV2Result, error) {
|
||||
return p.ListObjectsV2Parametrized(ctx, input, p.FileToObj)
|
||||
}
|
||||
|
||||
func (p *Posix) ListObjectsV2Parametrized(ctx context.Context, input *s3.ListObjectsV2Input, customFileToObj func(string, bool) backend.GetObjFunc) (s3response.ListObjectsV2Result, error) {
|
||||
bucket := *input.Bucket
|
||||
prefix := ""
|
||||
if input.Prefix != nil {
|
||||
@@ -4386,7 +4408,7 @@ func (p *Posix) ListObjectsV2(ctx context.Context, input *s3.ListObjectsV2Input)
|
||||
|
||||
fileSystem := os.DirFS(bucket)
|
||||
results, err := backend.Walk(ctx, fileSystem, prefix, delim, marker, maxkeys,
|
||||
p.fileToObj(bucket, fetchOwner), []string{metaTmpDir})
|
||||
customFileToObj(bucket, fetchOwner), []string{MetaTmpDir})
|
||||
if err != nil {
|
||||
return s3response.ListObjectsV2Result{}, fmt.Errorf("walk %v: %w", bucket, err)
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/versity/versitygw/auth"
|
||||
"github.com/versity/versitygw/backend"
|
||||
@@ -165,14 +166,10 @@ func (tmp *tmpfile) link() error {
|
||||
// of last upload completed wins and is not some combination of writes
|
||||
// from simultaneous uploads.
|
||||
objPath := filepath.Join(tmp.bucket, tmp.objname)
|
||||
err := os.Remove(objPath)
|
||||
if err != nil && !errors.Is(err, fs.ErrNotExist) {
|
||||
return fmt.Errorf("remove stale path: %w", err)
|
||||
}
|
||||
|
||||
dir := filepath.Dir(objPath)
|
||||
|
||||
err = backend.MkdirAll(dir, tmp.uid, tmp.gid, tmp.needsChown, tmp.newDirPerm)
|
||||
err := backend.MkdirAll(dir, tmp.uid, tmp.gid, tmp.needsChown, tmp.newDirPerm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("make parent dir: %w", err)
|
||||
}
|
||||
@@ -194,22 +191,34 @@ func (tmp *tmpfile) link() error {
|
||||
}
|
||||
defer dirf.Close()
|
||||
|
||||
for {
|
||||
err = unix.Linkat(int(procdir.Fd()), filepath.Base(tmp.f.Name()),
|
||||
int(dirf.Fd()), filepath.Base(objPath), unix.AT_SYMLINK_FOLLOW)
|
||||
if errors.Is(err, syscall.EEXIST) {
|
||||
err := os.Remove(objPath)
|
||||
if err != nil && !errors.Is(err, fs.ErrNotExist) {
|
||||
return fmt.Errorf("remove stale path: %w", err)
|
||||
}
|
||||
// Linkat cannot overwrite files; we will allocate a temporary file, Linkat to it and then Renameat it
|
||||
// to avoid potential race condition
|
||||
retries := 1
|
||||
for {
|
||||
tmpName := fmt.Sprintf(".%s.sgwtmp.%d", filepath.Base(objPath), time.Now().UnixNano())
|
||||
err := unix.Linkat(int(procdir.Fd()), filepath.Base(tmp.f.Name()),
|
||||
int(dirf.Fd()), tmpName, unix.AT_SYMLINK_FOLLOW)
|
||||
if errors.Is(err, syscall.EEXIST) && retries < 3 {
|
||||
retries += 1
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("link tmpfile (fd %q as %q): %w",
|
||||
filepath.Base(tmp.f.Name()), objPath, err)
|
||||
return fmt.Errorf("cannot find free temporary file: %w", err)
|
||||
}
|
||||
|
||||
err = unix.Renameat(int(dirf.Fd()), tmpName, int(dirf.Fd()), filepath.Base(objPath))
|
||||
if err != nil {
|
||||
return fmt.Errorf("overwriting renameat failed: %w", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("link tmpfile (fd %q as %q): %w",
|
||||
filepath.Base(tmp.f.Name()), objPath, err)
|
||||
}
|
||||
|
||||
err = tmp.f.Close()
|
||||
if err != nil {
|
||||
|
||||
@@ -21,7 +21,6 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
@@ -30,11 +29,8 @@ import (
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||
"github.com/pkg/xattr"
|
||||
"github.com/versity/versitygw/auth"
|
||||
"github.com/versity/versitygw/backend"
|
||||
"github.com/versity/versitygw/backend/meta"
|
||||
"github.com/versity/versitygw/backend/posix"
|
||||
"github.com/versity/versitygw/s3api/utils"
|
||||
"github.com/versity/versitygw/s3err"
|
||||
"github.com/versity/versitygw/s3response"
|
||||
)
|
||||
@@ -53,9 +49,6 @@ type ScoutFS struct {
|
||||
rootfd *os.File
|
||||
rootdir string
|
||||
|
||||
// bucket/object metadata storage facility
|
||||
meta meta.MetadataStorer
|
||||
|
||||
// glaciermode enables the following behavior:
|
||||
// GET object: if file offline, return invalid object state
|
||||
// HEAD object: if file offline, set obj storage class to GLACIER
|
||||
@@ -67,19 +60,6 @@ type ScoutFS struct {
|
||||
// RestoreObject: add batch stage request to file
|
||||
glaciermode bool
|
||||
|
||||
// chownuid/gid enable chowning of files to the account uid/gid
|
||||
// when objects are uploaded
|
||||
chownuid bool
|
||||
chowngid bool
|
||||
|
||||
// euid/egid are the effective uid/gid of the running versitygw process
|
||||
// used to determine if chowning is needed
|
||||
euid int
|
||||
egid int
|
||||
|
||||
// newDirPerm is the permissions to use when creating new directories
|
||||
newDirPerm fs.FileMode
|
||||
|
||||
// disableNoArchive is used to disable setting scoutam noarchive flag
|
||||
// on mutlipart parts. This is enabled by default to prevent archive
|
||||
// copies of temporary multipart parts.
|
||||
@@ -89,24 +69,6 @@ type ScoutFS struct {
|
||||
var _ backend.Backend = &ScoutFS{}
|
||||
|
||||
const (
|
||||
metaTmpDir = ".sgwtmp"
|
||||
metaTmpMultipartDir = metaTmpDir + "/multipart"
|
||||
tagHdr = "X-Amz-Tagging"
|
||||
metaHdr = "X-Amz-Meta"
|
||||
contentTypeHdr = "content-type"
|
||||
contentEncHdr = "content-encoding"
|
||||
contentLangHdr = "content-language"
|
||||
contentDispHdr = "content-disposition"
|
||||
cacheCtrlHdr = "cache-control"
|
||||
expiresHdr = "expires"
|
||||
emptyMD5 = "d41d8cd98f00b204e9800998ecf8427e"
|
||||
etagkey = "etag"
|
||||
checksumsKey = "checksums"
|
||||
objectRetentionKey = "object-retention"
|
||||
objectLegalHoldKey = "object-legal-hold"
|
||||
)
|
||||
|
||||
var (
|
||||
stageComplete = "ongoing-request=\"false\", expiry-date=\"Fri, 2 Dec 2050 00:00:00 GMT\""
|
||||
stageInProgress = "true"
|
||||
stageNotInProgress = "false"
|
||||
@@ -146,25 +108,6 @@ func (*ScoutFS) String() string {
|
||||
return "ScoutFS Gateway"
|
||||
}
|
||||
|
||||
// getChownIDs returns the uid and gid that should be used for chowning
|
||||
// the object to the account uid/gid. It also returns a boolean indicating
|
||||
// if chowning is needed.
|
||||
func (s *ScoutFS) getChownIDs(acct auth.Account) (int, int, bool) {
|
||||
uid := s.euid
|
||||
gid := s.egid
|
||||
var needsChown bool
|
||||
if s.chownuid && acct.UserID != s.euid {
|
||||
uid = acct.UserID
|
||||
needsChown = true
|
||||
}
|
||||
if s.chowngid && acct.GroupID != s.egid {
|
||||
gid = acct.GroupID
|
||||
needsChown = true
|
||||
}
|
||||
|
||||
return uid, gid, needsChown
|
||||
}
|
||||
|
||||
func (s *ScoutFS) UploadPart(ctx context.Context, input *s3.UploadPartInput) (*s3.UploadPartOutput, error) {
|
||||
out, err := s.Posix.UploadPart(ctx, input)
|
||||
if err != nil {
|
||||
@@ -175,7 +118,7 @@ func (s *ScoutFS) UploadPart(ctx context.Context, input *s3.UploadPartInput) (*s
|
||||
sum := sha256.Sum256([]byte(*input.Key))
|
||||
partPath := filepath.Join(
|
||||
*input.Bucket, // bucket
|
||||
metaTmpMultipartDir, // temp multipart dir
|
||||
posix.MetaTmpMultipartDir, // temp multipart dir
|
||||
fmt.Sprintf("%x", sum), // hashed objname
|
||||
*input.UploadId, // upload id
|
||||
fmt.Sprintf("%v", *input.PartNumber), // part number
|
||||
@@ -194,443 +137,7 @@ func (s *ScoutFS) UploadPart(ctx context.Context, input *s3.UploadPartInput) (*s
|
||||
// ioctl to not have to read and copy the part data to the final object. This
|
||||
// saves a read and write cycle for all mutlipart uploads.
|
||||
func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
|
||||
acct, ok := ctx.Value("account").(auth.Account)
|
||||
if !ok {
|
||||
acct = auth.Account{}
|
||||
}
|
||||
|
||||
var res s3response.CompleteMultipartUploadResult
|
||||
|
||||
if input.Key == nil {
|
||||
return res, "", s3err.GetAPIError(s3err.ErrNoSuchKey)
|
||||
}
|
||||
if input.UploadId == nil {
|
||||
return res, "", s3err.GetAPIError(s3err.ErrNoSuchUpload)
|
||||
}
|
||||
if input.MultipartUpload == nil {
|
||||
return res, "", s3err.GetAPIError(s3err.ErrInvalidRequest)
|
||||
}
|
||||
|
||||
bucket := *input.Bucket
|
||||
object := *input.Key
|
||||
uploadID := *input.UploadId
|
||||
parts := input.MultipartUpload.Parts
|
||||
|
||||
_, err := os.Stat(bucket)
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
return res, "", s3err.GetAPIError(s3err.ErrNoSuchBucket)
|
||||
}
|
||||
if err != nil {
|
||||
return res, "", fmt.Errorf("stat bucket: %w", err)
|
||||
}
|
||||
|
||||
sum, err := s.checkUploadIDExists(bucket, object, uploadID)
|
||||
if err != nil {
|
||||
return res, "", err
|
||||
}
|
||||
|
||||
objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum))
|
||||
|
||||
checksums, err := s.retrieveChecksums(nil, bucket, filepath.Join(objdir, uploadID))
|
||||
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
|
||||
return res, "", fmt.Errorf("get mp checksums: %w", err)
|
||||
}
|
||||
|
||||
// ChecksumType should be the same as specified on CreateMultipartUpload
|
||||
if input.ChecksumType != "" && checksums.Type != input.ChecksumType {
|
||||
checksumType := checksums.Type
|
||||
if checksumType == "" {
|
||||
checksumType = types.ChecksumType("null")
|
||||
}
|
||||
|
||||
return res, "", s3err.GetChecksumTypeMismatchOnMpErr(checksumType)
|
||||
}
|
||||
|
||||
// check all parts ok
|
||||
last := len(parts) - 1
|
||||
var totalsize int64
|
||||
|
||||
// The initialie values is the lower limit of partNumber: 0
|
||||
var partNumber int32
|
||||
for i, part := range parts {
|
||||
if part.PartNumber == nil {
|
||||
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
|
||||
}
|
||||
if *part.PartNumber < 1 {
|
||||
return res, "", s3err.GetAPIError(s3err.ErrInvalidCompleteMpPartNumber)
|
||||
}
|
||||
if *part.PartNumber <= partNumber {
|
||||
return res, "", s3err.GetAPIError(s3err.ErrInvalidPartOrder)
|
||||
}
|
||||
|
||||
partNumber = *part.PartNumber
|
||||
|
||||
partObjPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *part.PartNumber))
|
||||
fullPartPath := filepath.Join(bucket, partObjPath)
|
||||
fi, err := os.Lstat(fullPartPath)
|
||||
if err != nil {
|
||||
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
|
||||
}
|
||||
|
||||
totalsize += fi.Size()
|
||||
// all parts except the last need to be greater, thena
|
||||
// the minimum allowed size (5 Mib)
|
||||
if i < last && fi.Size() < backend.MinPartSize {
|
||||
return res, "", s3err.GetAPIError(s3err.ErrEntityTooSmall)
|
||||
}
|
||||
|
||||
b, err := s.meta.RetrieveAttribute(nil, bucket, partObjPath, etagkey)
|
||||
etag := string(b)
|
||||
if err != nil {
|
||||
etag = ""
|
||||
}
|
||||
if parts[i].ETag == nil || !backend.AreEtagsSame(etag, *parts[i].ETag) {
|
||||
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
|
||||
}
|
||||
|
||||
partChecksum, err := s.retrieveChecksums(nil, bucket, partObjPath)
|
||||
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
|
||||
return res, "", fmt.Errorf("get part checksum: %w", err)
|
||||
}
|
||||
|
||||
// If checksum has been provided on mp initalization
|
||||
err = validatePartChecksum(partChecksum, part)
|
||||
if err != nil {
|
||||
return res, "", err
|
||||
}
|
||||
}
|
||||
|
||||
if input.MpuObjectSize != nil && totalsize != *input.MpuObjectSize {
|
||||
return res, "", s3err.GetIncorrectMpObjectSizeErr(totalsize, *input.MpuObjectSize)
|
||||
}
|
||||
|
||||
// use totalsize=0 because we wont be writing to the file, only moving
|
||||
// extents around. so we dont want to fallocate this.
|
||||
f, err := s.openTmpFile(filepath.Join(bucket, metaTmpDir), bucket, object, 0, acct)
|
||||
if err != nil {
|
||||
if errors.Is(err, syscall.EDQUOT) {
|
||||
return res, "", s3err.GetAPIError(s3err.ErrQuotaExceeded)
|
||||
}
|
||||
return res, "", fmt.Errorf("open temp file: %w", err)
|
||||
}
|
||||
defer f.cleanup()
|
||||
|
||||
for _, part := range parts {
|
||||
if part.PartNumber == nil || *part.PartNumber < 1 {
|
||||
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
|
||||
}
|
||||
|
||||
partObjPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *part.PartNumber))
|
||||
fullPartPath := filepath.Join(bucket, partObjPath)
|
||||
pf, err := os.Open(fullPartPath)
|
||||
if err != nil {
|
||||
return res, "", fmt.Errorf("open part %v: %v", *part.PartNumber, err)
|
||||
}
|
||||
|
||||
// scoutfs move data is a metadata only operation that moves the data
|
||||
// extent references from the source, appeding to the destination.
|
||||
// this needs to be 4k aligned.
|
||||
err = moveData(pf, f.File())
|
||||
pf.Close()
|
||||
if err != nil {
|
||||
return res, "", fmt.Errorf("move blocks part %v: %v", *part.PartNumber, err)
|
||||
}
|
||||
}
|
||||
|
||||
userMetaData := make(map[string]string)
|
||||
upiddir := filepath.Join(objdir, uploadID)
|
||||
objMeta := s.loadUserMetaData(bucket, upiddir, userMetaData)
|
||||
err = s.storeObjectMetadata(f.File(), bucket, object, objMeta)
|
||||
if err != nil {
|
||||
return res, "", err
|
||||
}
|
||||
|
||||
objname := filepath.Join(bucket, object)
|
||||
dir := filepath.Dir(objname)
|
||||
if dir != "" {
|
||||
uid, gid, doChown := s.getChownIDs(acct)
|
||||
err = backend.MkdirAll(dir, uid, gid, doChown, s.newDirPerm)
|
||||
if err != nil {
|
||||
return res, "", err
|
||||
}
|
||||
}
|
||||
|
||||
for k, v := range userMetaData {
|
||||
err = s.meta.StoreAttribute(f.File(), bucket, object, fmt.Sprintf("%v.%v", metaHdr, k), []byte(v))
|
||||
if err != nil {
|
||||
return res, "", fmt.Errorf("set user attr %q: %w", k, err)
|
||||
}
|
||||
}
|
||||
|
||||
// load and set tagging
|
||||
tagging, err := s.meta.RetrieveAttribute(nil, bucket, upiddir, tagHdr)
|
||||
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
|
||||
return res, "", fmt.Errorf("get object tagging: %w", err)
|
||||
}
|
||||
if err == nil {
|
||||
err := s.meta.StoreAttribute(f.File(), bucket, object, tagHdr, tagging)
|
||||
if err != nil {
|
||||
return res, "", fmt.Errorf("set object tagging: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// load and set legal hold
|
||||
lHold, err := s.meta.RetrieveAttribute(nil, bucket, upiddir, objectLegalHoldKey)
|
||||
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
|
||||
return res, "", fmt.Errorf("get object legal hold: %w", err)
|
||||
}
|
||||
if err == nil {
|
||||
err := s.meta.StoreAttribute(f.File(), bucket, object, objectLegalHoldKey, lHold)
|
||||
if err != nil {
|
||||
return res, "", fmt.Errorf("set object legal hold: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// load and set retention
|
||||
ret, err := s.meta.RetrieveAttribute(nil, bucket, upiddir, objectRetentionKey)
|
||||
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
|
||||
return res, "", fmt.Errorf("get object retention: %w", err)
|
||||
}
|
||||
if err == nil {
|
||||
err := s.meta.StoreAttribute(f.File(), bucket, object, objectRetentionKey, ret)
|
||||
if err != nil {
|
||||
return res, "", fmt.Errorf("set object retention: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate s3 compatible md5sum for complete multipart.
|
||||
s3MD5 := backend.GetMultipartMD5(parts)
|
||||
|
||||
err = s.meta.StoreAttribute(f.File(), bucket, object, etagkey, []byte(s3MD5))
|
||||
if err != nil {
|
||||
return res, "", fmt.Errorf("set etag attr: %w", err)
|
||||
}
|
||||
|
||||
err = f.link()
|
||||
if err != nil {
|
||||
return res, "", fmt.Errorf("link object in namespace: %w", err)
|
||||
}
|
||||
|
||||
// cleanup tmp dirs
|
||||
os.RemoveAll(filepath.Join(bucket, upiddir))
|
||||
// use Remove for objdir in case there are still other uploads
|
||||
// for same object name outstanding
|
||||
os.Remove(filepath.Join(bucket, objdir))
|
||||
|
||||
return s3response.CompleteMultipartUploadResult{
|
||||
Bucket: &bucket,
|
||||
ETag: &s3MD5,
|
||||
Key: &object,
|
||||
}, "", nil
|
||||
}
|
||||
|
||||
func (s *ScoutFS) storeObjectMetadata(f *os.File, bucket, object string, m objectMetadata) error {
|
||||
if getString(m.ContentType) != "" {
|
||||
err := s.meta.StoreAttribute(f, bucket, object, contentTypeHdr, []byte(*m.ContentType))
|
||||
if err != nil {
|
||||
return fmt.Errorf("set content-type: %w", err)
|
||||
}
|
||||
}
|
||||
if getString(m.ContentEncoding) != "" {
|
||||
err := s.meta.StoreAttribute(f, bucket, object, contentEncHdr, []byte(*m.ContentEncoding))
|
||||
if err != nil {
|
||||
return fmt.Errorf("set content-encoding: %w", err)
|
||||
}
|
||||
}
|
||||
if getString(m.ContentDisposition) != "" {
|
||||
err := s.meta.StoreAttribute(f, bucket, object, contentDispHdr, []byte(*m.ContentDisposition))
|
||||
if err != nil {
|
||||
return fmt.Errorf("set content-disposition: %w", err)
|
||||
}
|
||||
}
|
||||
if getString(m.ContentLanguage) != "" {
|
||||
err := s.meta.StoreAttribute(f, bucket, object, contentLangHdr, []byte(*m.ContentLanguage))
|
||||
if err != nil {
|
||||
return fmt.Errorf("set content-language: %w", err)
|
||||
}
|
||||
}
|
||||
if getString(m.CacheControl) != "" {
|
||||
err := s.meta.StoreAttribute(f, bucket, object, cacheCtrlHdr, []byte(*m.CacheControl))
|
||||
if err != nil {
|
||||
return fmt.Errorf("set cache-control: %w", err)
|
||||
}
|
||||
}
|
||||
if getString(m.Expires) != "" {
|
||||
err := s.meta.StoreAttribute(f, bucket, object, expiresHdr, []byte(*m.Expires))
|
||||
if err != nil {
|
||||
return fmt.Errorf("set cache-control: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func validatePartChecksum(checksum s3response.Checksum, part types.CompletedPart) error {
|
||||
n := numberOfChecksums(part)
|
||||
if n > 1 {
|
||||
return s3err.GetAPIError(s3err.ErrInvalidChecksumPart)
|
||||
}
|
||||
if checksum.Algorithm == "" {
|
||||
if n != 0 {
|
||||
return s3err.GetAPIError(s3err.ErrInvalidPart)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
algo := checksum.Algorithm
|
||||
if n == 0 {
|
||||
return s3err.APIError{
|
||||
Code: "InvalidRequest",
|
||||
Description: fmt.Sprintf("The upload was created using a %v checksum. The complete request must include the checksum for each part. It was missing for part %v in the request.", strings.ToLower(string(algo)), *part.PartNumber),
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
}
|
||||
}
|
||||
|
||||
for _, cs := range []struct {
|
||||
checksum *string
|
||||
expectedChecksum string
|
||||
algo types.ChecksumAlgorithm
|
||||
}{
|
||||
{part.ChecksumCRC32, getString(checksum.CRC32), types.ChecksumAlgorithmCrc32},
|
||||
{part.ChecksumCRC32C, getString(checksum.CRC32C), types.ChecksumAlgorithmCrc32c},
|
||||
{part.ChecksumSHA1, getString(checksum.SHA1), types.ChecksumAlgorithmSha1},
|
||||
{part.ChecksumSHA256, getString(checksum.SHA256), types.ChecksumAlgorithmSha256},
|
||||
{part.ChecksumCRC64NVME, getString(checksum.CRC64NVME), types.ChecksumAlgorithmCrc64nvme},
|
||||
} {
|
||||
if cs.checksum == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if !utils.IsValidChecksum(*cs.checksum, cs.algo) {
|
||||
return s3err.GetAPIError(s3err.ErrInvalidChecksumPart)
|
||||
}
|
||||
|
||||
if *cs.checksum != cs.expectedChecksum {
|
||||
if algo == cs.algo {
|
||||
return s3err.GetAPIError(s3err.ErrInvalidPart)
|
||||
}
|
||||
|
||||
return s3err.APIError{
|
||||
Code: "BadDigest",
|
||||
Description: fmt.Sprintf("The %v you specified for part %v did not match what we received.", strings.ToLower(string(cs.algo)), *part.PartNumber),
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func numberOfChecksums(part types.CompletedPart) int {
|
||||
counter := 0
|
||||
if getString(part.ChecksumCRC32) != "" {
|
||||
counter++
|
||||
}
|
||||
if getString(part.ChecksumCRC32C) != "" {
|
||||
counter++
|
||||
}
|
||||
if getString(part.ChecksumSHA1) != "" {
|
||||
counter++
|
||||
}
|
||||
if getString(part.ChecksumSHA256) != "" {
|
||||
counter++
|
||||
}
|
||||
if getString(part.ChecksumCRC64NVME) != "" {
|
||||
counter++
|
||||
}
|
||||
|
||||
return counter
|
||||
}
|
||||
|
||||
func (s *ScoutFS) checkUploadIDExists(bucket, object, uploadID string) ([32]byte, error) {
|
||||
sum := sha256.Sum256([]byte(object))
|
||||
objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum))
|
||||
|
||||
_, err := os.Stat(filepath.Join(objdir, uploadID))
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
return [32]byte{}, s3err.GetAPIError(s3err.ErrNoSuchUpload)
|
||||
}
|
||||
if err != nil {
|
||||
return [32]byte{}, fmt.Errorf("stat upload: %w", err)
|
||||
}
|
||||
return sum, nil
|
||||
}
|
||||
|
||||
type objectMetadata struct {
|
||||
ContentType *string
|
||||
ContentEncoding *string
|
||||
ContentDisposition *string
|
||||
ContentLanguage *string
|
||||
CacheControl *string
|
||||
Expires *string
|
||||
}
|
||||
|
||||
// fll out the user metadata map with the metadata for the object
|
||||
// and return the content type and encoding
|
||||
func (s *ScoutFS) loadUserMetaData(bucket, object string, m map[string]string) objectMetadata {
|
||||
ents, err := s.meta.ListAttributes(bucket, object)
|
||||
if err != nil || len(ents) == 0 {
|
||||
return objectMetadata{}
|
||||
}
|
||||
for _, e := range ents {
|
||||
if !isValidMeta(e) {
|
||||
continue
|
||||
}
|
||||
b, err := s.meta.RetrieveAttribute(nil, bucket, object, e)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if b == nil {
|
||||
m[strings.TrimPrefix(e, fmt.Sprintf("%v.", metaHdr))] = ""
|
||||
continue
|
||||
}
|
||||
m[strings.TrimPrefix(e, fmt.Sprintf("%v.", metaHdr))] = string(b)
|
||||
}
|
||||
|
||||
var result objectMetadata
|
||||
|
||||
b, err := s.meta.RetrieveAttribute(nil, bucket, object, contentTypeHdr)
|
||||
if err == nil {
|
||||
result.ContentType = backend.GetPtrFromString(string(b))
|
||||
}
|
||||
|
||||
b, err = s.meta.RetrieveAttribute(nil, bucket, object, contentEncHdr)
|
||||
if err == nil {
|
||||
result.ContentEncoding = backend.GetPtrFromString(string(b))
|
||||
}
|
||||
|
||||
b, err = s.meta.RetrieveAttribute(nil, bucket, object, contentDispHdr)
|
||||
if err == nil {
|
||||
result.ContentDisposition = backend.GetPtrFromString(string(b))
|
||||
}
|
||||
|
||||
b, err = s.meta.RetrieveAttribute(nil, bucket, object, contentLangHdr)
|
||||
if err == nil {
|
||||
result.ContentLanguage = backend.GetPtrFromString(string(b))
|
||||
}
|
||||
|
||||
b, err = s.meta.RetrieveAttribute(nil, bucket, object, cacheCtrlHdr)
|
||||
if err == nil {
|
||||
result.CacheControl = backend.GetPtrFromString(string(b))
|
||||
}
|
||||
|
||||
b, err = s.meta.RetrieveAttribute(nil, bucket, object, expiresHdr)
|
||||
if err == nil {
|
||||
result.Expires = backend.GetPtrFromString(string(b))
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func isValidMeta(val string) bool {
|
||||
if strings.HasPrefix(val, metaHdr) {
|
||||
return true
|
||||
}
|
||||
if strings.EqualFold(val, "Expires") {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
return s.Posix.CompleteMultipartUploadWithCopy(ctx, input, moveData)
|
||||
}
|
||||
|
||||
func (s *ScoutFS) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) {
|
||||
@@ -727,173 +234,33 @@ func (s *ScoutFS) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.
|
||||
}
|
||||
|
||||
func (s *ScoutFS) ListObjects(ctx context.Context, input *s3.ListObjectsInput) (s3response.ListObjectsResult, error) {
|
||||
bucket := *input.Bucket
|
||||
prefix := ""
|
||||
if input.Prefix != nil {
|
||||
prefix = *input.Prefix
|
||||
if s.glaciermode {
|
||||
return s.Posix.ListObjectsParametrized(ctx, input, s.glacierFileToObj)
|
||||
} else {
|
||||
return s.Posix.ListObjects(ctx, input)
|
||||
}
|
||||
marker := ""
|
||||
if input.Marker != nil {
|
||||
marker = *input.Marker
|
||||
}
|
||||
delim := ""
|
||||
if input.Delimiter != nil {
|
||||
delim = *input.Delimiter
|
||||
}
|
||||
maxkeys := int32(0)
|
||||
if input.MaxKeys != nil {
|
||||
maxkeys = *input.MaxKeys
|
||||
}
|
||||
|
||||
_, err := os.Stat(bucket)
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
return s3response.ListObjectsResult{}, s3err.GetAPIError(s3err.ErrNoSuchBucket)
|
||||
}
|
||||
if err != nil {
|
||||
return s3response.ListObjectsResult{}, fmt.Errorf("stat bucket: %w", err)
|
||||
}
|
||||
|
||||
fileSystem := os.DirFS(bucket)
|
||||
results, err := backend.Walk(ctx, fileSystem, prefix, delim, marker, maxkeys,
|
||||
s.fileToObj(bucket), []string{metaTmpDir})
|
||||
if err != nil {
|
||||
return s3response.ListObjectsResult{}, fmt.Errorf("walk %v: %w", bucket, err)
|
||||
}
|
||||
|
||||
return s3response.ListObjectsResult{
|
||||
CommonPrefixes: results.CommonPrefixes,
|
||||
Contents: results.Objects,
|
||||
Delimiter: backend.GetPtrFromString(delim),
|
||||
Marker: backend.GetPtrFromString(marker),
|
||||
NextMarker: backend.GetPtrFromString(results.NextMarker),
|
||||
Prefix: backend.GetPtrFromString(prefix),
|
||||
IsTruncated: &results.Truncated,
|
||||
MaxKeys: &maxkeys,
|
||||
Name: &bucket,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *ScoutFS) ListObjectsV2(ctx context.Context, input *s3.ListObjectsV2Input) (s3response.ListObjectsV2Result, error) {
|
||||
bucket := *input.Bucket
|
||||
prefix := ""
|
||||
if input.Prefix != nil {
|
||||
prefix = *input.Prefix
|
||||
}
|
||||
marker := ""
|
||||
if input.ContinuationToken != nil {
|
||||
if input.StartAfter != nil {
|
||||
marker = max(*input.StartAfter, *input.ContinuationToken)
|
||||
if s.glaciermode {
|
||||
return s.Posix.ListObjectsV2Parametrized(ctx, input, s.glacierFileToObj)
|
||||
} else {
|
||||
marker = *input.ContinuationToken
|
||||
return s.Posix.ListObjectsV2(ctx, input)
|
||||
}
|
||||
}
|
||||
delim := ""
|
||||
if input.Delimiter != nil {
|
||||
delim = *input.Delimiter
|
||||
}
|
||||
maxkeys := int32(0)
|
||||
if input.MaxKeys != nil {
|
||||
maxkeys = *input.MaxKeys
|
||||
}
|
||||
|
||||
_, err := os.Stat(bucket)
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
return s3response.ListObjectsV2Result{}, s3err.GetAPIError(s3err.ErrNoSuchBucket)
|
||||
}
|
||||
if err != nil {
|
||||
return s3response.ListObjectsV2Result{}, fmt.Errorf("stat bucket: %w", err)
|
||||
}
|
||||
|
||||
fileSystem := os.DirFS(bucket)
|
||||
results, err := backend.Walk(ctx, fileSystem, prefix, delim, marker, int32(maxkeys),
|
||||
s.fileToObj(bucket), []string{metaTmpDir})
|
||||
if err != nil {
|
||||
return s3response.ListObjectsV2Result{}, fmt.Errorf("walk %v: %w", bucket, err)
|
||||
}
|
||||
|
||||
count := int32(len(results.Objects))
|
||||
|
||||
return s3response.ListObjectsV2Result{
|
||||
CommonPrefixes: results.CommonPrefixes,
|
||||
Contents: results.Objects,
|
||||
IsTruncated: &results.Truncated,
|
||||
MaxKeys: &maxkeys,
|
||||
Name: &bucket,
|
||||
KeyCount: &count,
|
||||
Delimiter: backend.GetPtrFromString(delim),
|
||||
ContinuationToken: backend.GetPtrFromString(marker),
|
||||
NextContinuationToken: backend.GetPtrFromString(results.NextMarker),
|
||||
Prefix: backend.GetPtrFromString(prefix),
|
||||
StartAfter: backend.GetPtrFromString(*input.StartAfter),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *ScoutFS) fileToObj(bucket string) backend.GetObjFunc {
|
||||
// FileToObj function for ListObject calls that adds a Glacier storage class if the file is offline
|
||||
func (s *ScoutFS) glacierFileToObj(bucket string, fetchOwner bool) backend.GetObjFunc {
|
||||
posixFileToObj := s.Posix.FileToObj(bucket, fetchOwner)
|
||||
|
||||
return func(path string, d fs.DirEntry) (s3response.Object, error) {
|
||||
res, err := posixFileToObj(path, d)
|
||||
if err != nil || d.IsDir() {
|
||||
return res, err
|
||||
}
|
||||
objPath := filepath.Join(bucket, path)
|
||||
if d.IsDir() {
|
||||
// directory object only happens if directory empty
|
||||
// check to see if this is a directory object by checking etag
|
||||
etagBytes, err := s.meta.RetrieveAttribute(nil, bucket, path, etagkey)
|
||||
if errors.Is(err, meta.ErrNoSuchKey) || errors.Is(err, fs.ErrNotExist) {
|
||||
return s3response.Object{}, backend.ErrSkipObj
|
||||
}
|
||||
if err != nil {
|
||||
return s3response.Object{}, fmt.Errorf("get etag: %w", err)
|
||||
}
|
||||
etag := string(etagBytes)
|
||||
|
||||
fi, err := d.Info()
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
return s3response.Object{}, backend.ErrSkipObj
|
||||
}
|
||||
if err != nil {
|
||||
return s3response.Object{}, fmt.Errorf("get fileinfo: %w", err)
|
||||
}
|
||||
|
||||
size := int64(0)
|
||||
mtime := fi.ModTime()
|
||||
|
||||
return s3response.Object{
|
||||
ETag: &etag,
|
||||
Key: &path,
|
||||
LastModified: &mtime,
|
||||
Size: &size,
|
||||
StorageClass: types.ObjectStorageClassStandard,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Retreive the object checksum algorithm
|
||||
checksums, err := s.retrieveChecksums(nil, bucket, path)
|
||||
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
|
||||
return s3response.Object{}, backend.ErrSkipObj
|
||||
}
|
||||
|
||||
// file object, get object info and fill out object data
|
||||
b, err := s.meta.RetrieveAttribute(nil, bucket, path, etagkey)
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
return s3response.Object{}, backend.ErrSkipObj
|
||||
}
|
||||
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
|
||||
return s3response.Object{}, fmt.Errorf("get etag: %w", err)
|
||||
}
|
||||
// note: meta.ErrNoSuchKey will return etagBytes = []byte{}
|
||||
// so this will just set etag to "" if its not already set
|
||||
|
||||
etag := string(b)
|
||||
|
||||
fi, err := d.Info()
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
return s3response.Object{}, backend.ErrSkipObj
|
||||
}
|
||||
if err != nil {
|
||||
return s3response.Object{}, fmt.Errorf("get fileinfo: %w", err)
|
||||
}
|
||||
|
||||
sc := types.ObjectStorageClassStandard
|
||||
if s.glaciermode {
|
||||
// Check if there are any offline exents associated with this file.
|
||||
// If so, we will return the InvalidObjectState error.
|
||||
// If so, we will return the Glacier storage class
|
||||
st, err := statMore(objPath)
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
return s3response.Object{}, backend.ErrSkipObj
|
||||
@@ -902,33 +269,10 @@ func (s *ScoutFS) fileToObj(bucket string) backend.GetObjFunc {
|
||||
return s3response.Object{}, fmt.Errorf("stat more: %w", err)
|
||||
}
|
||||
if st.Offline_blocks != 0 {
|
||||
sc = types.ObjectStorageClassGlacier
|
||||
res.StorageClass = types.ObjectStorageClassGlacier
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
size := fi.Size()
|
||||
mtime := fi.ModTime()
|
||||
|
||||
return s3response.Object{
|
||||
ETag: &etag,
|
||||
Key: &path,
|
||||
LastModified: &mtime,
|
||||
Size: &size,
|
||||
StorageClass: sc,
|
||||
ChecksumAlgorithm: []types.ChecksumAlgorithm{checksums.Algorithm},
|
||||
ChecksumType: checksums.Type,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ScoutFS) retrieveChecksums(f *os.File, bucket, object string) (checksums s3response.Checksum, err error) {
|
||||
checksumsAtr, err := s.meta.RetrieveAttribute(f, bucket, object, checksumsKey)
|
||||
if err != nil {
|
||||
return checksums, err
|
||||
}
|
||||
|
||||
err = json.Unmarshal(checksumsAtr, &checksums)
|
||||
return checksums, err
|
||||
}
|
||||
|
||||
// RestoreObject will set stage request on file if offline and do nothing if
|
||||
@@ -956,13 +300,6 @@ func (s *ScoutFS) RestoreObject(_ context.Context, input *s3.RestoreObjectInput)
|
||||
return nil
|
||||
}
|
||||
|
||||
func getString(str *string) string {
|
||||
if str == nil {
|
||||
return ""
|
||||
}
|
||||
return *str
|
||||
}
|
||||
|
||||
func isStaging(objname string) (bool, error) {
|
||||
b, err := xattr.Get(objname, flagskey)
|
||||
if err != nil && !isNoAttr(err) {
|
||||
|
||||
@@ -17,22 +17,13 @@
|
||||
package scoutfs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"syscall"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
|
||||
"github.com/versity/scoutfs-go"
|
||||
"github.com/versity/versitygw/auth"
|
||||
"github.com/versity/versitygw/backend"
|
||||
"github.com/versity/versitygw/backend/meta"
|
||||
"github.com/versity/versitygw/backend/posix"
|
||||
"github.com/versity/versitygw/s3err"
|
||||
"github.com/versity/versitygw/s3api/debuglogger"
|
||||
)
|
||||
|
||||
func New(rootdir string, opts ScoutfsOpts) (*ScoutFS, error) {
|
||||
@@ -57,148 +48,31 @@ func New(rootdir string, opts ScoutfsOpts) (*ScoutFS, error) {
|
||||
Posix: p,
|
||||
rootfd: f,
|
||||
rootdir: rootdir,
|
||||
meta: metastore,
|
||||
chownuid: opts.ChownUID,
|
||||
chowngid: opts.ChownGID,
|
||||
glaciermode: opts.GlacierMode,
|
||||
newDirPerm: opts.NewDirPerm,
|
||||
disableNoArchive: opts.DisableNoArchive,
|
||||
}, nil
|
||||
}
|
||||
|
||||
const procfddir = "/proc/self/fd"
|
||||
|
||||
type tmpfile struct {
|
||||
f *os.File
|
||||
bucket string
|
||||
objname string
|
||||
size int64
|
||||
needsChown bool
|
||||
uid int
|
||||
gid int
|
||||
newDirPerm fs.FileMode
|
||||
}
|
||||
|
||||
var (
|
||||
defaultFilePerm uint32 = 0644
|
||||
)
|
||||
|
||||
func (s *ScoutFS) openTmpFile(dir, bucket, obj string, size int64, acct auth.Account) (*tmpfile, error) {
|
||||
uid, gid, doChown := s.getChownIDs(acct)
|
||||
|
||||
// O_TMPFILE allows for a file handle to an unnamed file in the filesystem.
|
||||
// This can help reduce contention within the namespace (parent directories),
|
||||
// etc. And will auto cleanup the inode on close if we never link this
|
||||
// file descriptor into the namespace.
|
||||
fd, err := unix.Open(dir, unix.O_RDWR|unix.O_TMPFILE|unix.O_CLOEXEC, defaultFilePerm)
|
||||
if err != nil {
|
||||
if errors.Is(err, syscall.EROFS) {
|
||||
return nil, s3err.GetAPIError(s3err.ErrMethodNotAllowed)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// for O_TMPFILE, filename is /proc/self/fd/<fd> to be used
|
||||
// later to link file into namespace
|
||||
f := os.NewFile(uintptr(fd), filepath.Join(procfddir, strconv.Itoa(fd)))
|
||||
|
||||
tmp := &tmpfile{
|
||||
f: f,
|
||||
bucket: bucket,
|
||||
objname: obj,
|
||||
size: size,
|
||||
needsChown: doChown,
|
||||
uid: uid,
|
||||
gid: gid,
|
||||
newDirPerm: s.newDirPerm,
|
||||
}
|
||||
|
||||
if doChown {
|
||||
err := f.Chown(uid, gid)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("set temp file ownership: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return tmp, nil
|
||||
}
|
||||
|
||||
func (tmp *tmpfile) link() error {
|
||||
// We use Linkat/Rename as the atomic operation for object puts. The
|
||||
// upload is written to a temp (or unnamed/O_TMPFILE) file to not conflict
|
||||
// with any other simultaneous uploads. The final operation is to move the
|
||||
// temp file into place for the object. This ensures the object semantics
|
||||
// of last upload completed wins and is not some combination of writes
|
||||
// from simultaneous uploads.
|
||||
objPath := filepath.Join(tmp.bucket, tmp.objname)
|
||||
err := os.Remove(objPath)
|
||||
if err != nil && !errors.Is(err, fs.ErrNotExist) {
|
||||
return fmt.Errorf("remove stale path: %w", err)
|
||||
}
|
||||
|
||||
dir := filepath.Dir(objPath)
|
||||
|
||||
err = backend.MkdirAll(dir, tmp.uid, tmp.gid, tmp.needsChown, tmp.newDirPerm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("make parent dir: %w", err)
|
||||
}
|
||||
|
||||
procdir, err := os.Open(procfddir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open proc dir: %w", err)
|
||||
}
|
||||
defer procdir.Close()
|
||||
|
||||
dirf, err := os.Open(dir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open parent dir: %w", err)
|
||||
}
|
||||
defer dirf.Close()
|
||||
|
||||
for {
|
||||
err = unix.Linkat(int(procdir.Fd()), filepath.Base(tmp.f.Name()),
|
||||
int(dirf.Fd()), filepath.Base(objPath), unix.AT_SYMLINK_FOLLOW)
|
||||
if errors.Is(err, fs.ErrExist) {
|
||||
err := os.Remove(objPath)
|
||||
if err != nil && !errors.Is(err, fs.ErrNotExist) {
|
||||
return fmt.Errorf("remove stale path: %w", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("link tmpfile: %w", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
err = tmp.f.Close()
|
||||
if err != nil {
|
||||
return fmt.Errorf("close tmpfile: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tmp *tmpfile) Write(b []byte) (int, error) {
|
||||
if int64(len(b)) > tmp.size {
|
||||
return 0, fmt.Errorf("write exceeds content length %v", tmp.size)
|
||||
}
|
||||
|
||||
n, err := tmp.f.Write(b)
|
||||
tmp.size -= int64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (tmp *tmpfile) cleanup() {
|
||||
tmp.f.Close()
|
||||
}
|
||||
|
||||
func (tmp *tmpfile) File() *os.File {
|
||||
return tmp.f
|
||||
}
|
||||
|
||||
func moveData(from *os.File, to *os.File) error {
|
||||
return scoutfs.MoveData(from, to)
|
||||
// May fail if the files are not 4K aligned; check for alignment
|
||||
ffi, err := from.Stat()
|
||||
if err != nil {
|
||||
return fmt.Errorf("stat from: %v", err)
|
||||
}
|
||||
tfi, err := to.Stat()
|
||||
if err != nil {
|
||||
return fmt.Errorf("stat to: %v", err)
|
||||
}
|
||||
if ffi.Size()%4096 != 0 || tfi.Size()%4096 != 0 {
|
||||
return os.ErrInvalid
|
||||
}
|
||||
|
||||
//
|
||||
err = scoutfs.MoveData(from, to)
|
||||
if err != nil {
|
||||
debuglogger.Logf("ScoutFs MoveData failed: %v", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func statMore(path string) (stat, error) {
|
||||
|
||||
@@ -20,44 +20,16 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/versity/versitygw/auth"
|
||||
)
|
||||
|
||||
func New(rootdir string, opts ScoutfsOpts) (*ScoutFS, error) {
|
||||
return nil, fmt.Errorf("scoutfs only available on linux")
|
||||
}
|
||||
|
||||
type tmpfile struct{}
|
||||
|
||||
var (
|
||||
errNotSupported = errors.New("not supported")
|
||||
)
|
||||
|
||||
func (s *ScoutFS) openTmpFile(_, _, _ string, _ int64, _ auth.Account) (*tmpfile, error) {
|
||||
// make these look used for static check
|
||||
_ = s.chownuid
|
||||
_ = s.chowngid
|
||||
_ = s.euid
|
||||
_ = s.egid
|
||||
return nil, errNotSupported
|
||||
}
|
||||
|
||||
func (tmp *tmpfile) link() error {
|
||||
return errNotSupported
|
||||
}
|
||||
|
||||
func (tmp *tmpfile) Write(b []byte) (int, error) {
|
||||
return 0, errNotSupported
|
||||
}
|
||||
|
||||
func (tmp *tmpfile) cleanup() {
|
||||
}
|
||||
|
||||
func (tmp *tmpfile) File() *os.File {
|
||||
return nil
|
||||
}
|
||||
|
||||
func moveData(_, _ *os.File) error {
|
||||
return errNotSupported
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user