fix: correct metadata, tags, and lock info for scoutfs multipart objects

Add meta.MetadataStorer compatibility to scoutfs so that scoutfs
is using the same interface as posix. This fixes the metadata
retrieval and adds the recently supported object lock compatibility
as well.
This commit is contained in:
Ben McClelland
2024-06-10 17:42:55 -07:00
parent 7322309ea9
commit 576dfc5884
3 changed files with 227 additions and 80 deletions

View File

@@ -509,6 +509,10 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
partsize := int64(0)
var totalsize int64
for i, part := range parts {
if part.PartNumber == nil || *part.PartNumber < 1 {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
}
partObjPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *part.PartNumber))
fullPartPath := filepath.Join(bucket, partObjPath)
fi, err := os.Lstat(fullPartPath)
@@ -530,7 +534,7 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
if err != nil {
etag = ""
}
if etag != *parts[i].ETag {
if parts[i].ETag == nil || etag != *parts[i].ETag {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
}
}
@@ -546,6 +550,10 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
defer f.cleanup()
for _, part := range parts {
if part.PartNumber == nil || *part.PartNumber < 1 {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
}
partObjPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *part.PartNumber))
fullPartPath := filepath.Join(bucket, partObjPath)
pf, err := os.Open(fullPartPath)

View File

@@ -26,12 +26,14 @@ import (
"path/filepath"
"strings"
"syscall"
"time"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/pkg/xattr"
"github.com/versity/versitygw/auth"
"github.com/versity/versitygw/backend"
"github.com/versity/versitygw/backend/meta"
"github.com/versity/versitygw/backend/posix"
"github.com/versity/versitygw/s3err"
)
@@ -47,6 +49,9 @@ type ScoutFS struct {
rootfd *os.File
rootdir string
// bucket/object metadata storage facility
meta meta.MetadataStorer
// glaciermode enables the following behavior:
// GET object: if file offline, return invalid object state
// HEAD object: if file offline, set obj storage class to GLACIER
@@ -75,8 +80,13 @@ const (
metaTmpDir = ".sgwtmp"
metaTmpMultipartDir = metaTmpDir + "/multipart"
tagHdr = "X-Amz-Tagging"
metaHdr = "X-Amz-Meta"
contentTypeHdr = "content-type"
contentEncHdr = "content-encoding"
emptyMD5 = "d41d8cd98f00b204e9800998ecf8427e"
etagkey = "user.etag"
etagkey = "etag"
objectRetentionKey = "object-retention"
objectLegalHoldKey = "object-legal-hold"
)
var (
@@ -87,11 +97,12 @@ var (
const (
// ScoutFS special xattr types
systemPrefix = "scoutfs.hide."
onameAttr = systemPrefix + "objname"
flagskey = systemPrefix + "sam_flags"
stagecopykey = systemPrefix + "sam_stagereq"
fsBlocksize = 4096
)
const (
@@ -179,18 +190,20 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
return nil, err
}
objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum))
objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum))
// check all parts ok
last := len(parts) - 1
partsize := int64(0)
var totalsize int64
for i, p := range parts {
if p.PartNumber == nil {
for i, part := range parts {
if part.PartNumber == nil || *part.PartNumber < 1 {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
}
partPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *p.PartNumber))
fi, err := os.Lstat(partPath)
partObjPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *part.PartNumber))
fullPartPath := filepath.Join(bucket, partObjPath)
fi, err := os.Lstat(fullPartPath)
if err != nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
}
@@ -198,23 +211,25 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
if i == 0 {
partsize = fi.Size()
}
// partsize must be a multiple of the filesystem blocksize
// except for last part
if i < last && partsize%fsBlocksize != 0 {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
}
totalsize += fi.Size()
// all parts except the last need to be the same size
if i < last && partsize != fi.Size() {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
}
// non-last part sizes need to be multiples of 4k for move blocks
// TODO: fallback to no move blocks if not 4k aligned?
if i == 0 && i < last && fi.Size()%4096 != 0 {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
}
b, err := xattr.Get(partPath, "user.etag")
b, err := s.meta.RetrieveAttribute(bucket, partObjPath, etagkey)
etag := string(b)
if err != nil {
etag = ""
}
if etag != *parts[i].ETag {
if parts[i].ETag == nil || etag != *parts[i].ETag {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
}
}
@@ -230,10 +245,16 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
}
defer f.cleanup()
for _, p := range parts {
pf, err := os.Open(filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *p.PartNumber)))
for _, part := range parts {
if part.PartNumber == nil || *part.PartNumber < 1 {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
}
partObjPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *part.PartNumber))
fullPartPath := filepath.Join(bucket, partObjPath)
pf, err := os.Open(fullPartPath)
if err != nil {
return nil, fmt.Errorf("open part %v: %v", *p.PartNumber, err)
return nil, fmt.Errorf("open part %v: %v", *part.PartNumber, err)
}
// scoutfs move data is a metadata only operation that moves the data
@@ -242,13 +263,13 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
err = moveData(pf, f.f)
pf.Close()
if err != nil {
return nil, fmt.Errorf("move blocks part %v: %v", *p.PartNumber, err)
return nil, fmt.Errorf("move blocks part %v: %v", *part.PartNumber, err)
}
}
userMetaData := make(map[string]string)
upiddir := filepath.Join(objdir, uploadID)
loadUserMetaData(upiddir, userMetaData)
cType, _ := s.loadUserMetaData(bucket, upiddir, userMetaData)
objname := filepath.Join(bucket, object)
dir := filepath.Dir(objname)
@@ -265,7 +286,7 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
}
for k, v := range userMetaData {
err = xattr.Set(objname, "user."+k, []byte(v))
err = s.meta.StoreAttribute(bucket, object, fmt.Sprintf("%v.%v", metaHdr, k), []byte(v))
if err != nil {
// cleanup object if returning error
os.Remove(objname)
@@ -273,10 +294,58 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
}
}
// load and set tagging
tagging, err := s.meta.RetrieveAttribute(bucket, upiddir, tagHdr)
if err == nil {
if err := s.meta.StoreAttribute(bucket, object, tagHdr, tagging); err != nil {
// cleanup object
os.Remove(objname)
return nil, fmt.Errorf("set object tagging: %w", err)
}
}
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object tagging: %w", err)
}
// set content-type
if cType != "" {
if err := s.meta.StoreAttribute(bucket, object, contentTypeHdr, []byte(cType)); err != nil {
// cleanup object
os.Remove(objname)
return nil, fmt.Errorf("set object content type: %w", err)
}
}
// load and set legal hold
lHold, err := s.meta.RetrieveAttribute(bucket, upiddir, objectLegalHoldKey)
if err == nil {
if err := s.meta.StoreAttribute(bucket, object, objectLegalHoldKey, lHold); err != nil {
// cleanup object
os.Remove(objname)
return nil, fmt.Errorf("set object legal hold: %w", err)
}
}
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object legal hold: %w", err)
}
// load and set retention
ret, err := s.meta.RetrieveAttribute(bucket, upiddir, objectRetentionKey)
if err == nil {
if err := s.meta.StoreAttribute(bucket, object, objectRetentionKey, ret); err != nil {
// cleanup object
os.Remove(objname)
return nil, fmt.Errorf("set object retention: %w", err)
}
}
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object retention: %w", err)
}
// Calculate s3 compatible md5sum for complete multipart.
s3MD5 := backend.GetMultipartMD5(parts)
err = xattr.Set(objname, "user.etag", []byte(s3MD5))
err = s.meta.StoreAttribute(bucket, object, etagkey, []byte(s3MD5))
if err != nil {
// cleanup object if returning error
os.Remove(objname)
@@ -310,61 +379,104 @@ func (s *ScoutFS) checkUploadIDExists(bucket, object, uploadID string) ([32]byte
return sum, nil
}
func loadUserMetaData(path string, m map[string]string) (contentType, contentEncoding string) {
ents, err := xattr.List(path)
// fll out the user metadata map with the metadata for the object
// and return the content type and encoding
func (s *ScoutFS) loadUserMetaData(bucket, object string, m map[string]string) (string, string) {
ents, err := s.meta.ListAttributes(bucket, object)
if err != nil || len(ents) == 0 {
return
return "", ""
}
for _, e := range ents {
if !isValidMeta(e) {
continue
}
b, err := xattr.Get(path, e)
if err == errNoData {
m[strings.TrimPrefix(e, "user.")] = ""
continue
}
b, err := s.meta.RetrieveAttribute(bucket, object, e)
if err != nil {
continue
}
m[strings.TrimPrefix(e, "user.")] = string(b)
if b == nil {
m[strings.TrimPrefix(e, fmt.Sprintf("%v.", metaHdr))] = ""
continue
}
m[strings.TrimPrefix(e, fmt.Sprintf("%v.", metaHdr))] = string(b)
}
b, err := xattr.Get(path, "user.content-type")
var contentType, contentEncoding string
b, _ := s.meta.RetrieveAttribute(bucket, object, contentTypeHdr)
contentType = string(b)
if err != nil {
contentType = ""
}
if contentType != "" {
m["content-type"] = contentType
m[contentTypeHdr] = contentType
}
b, err = xattr.Get(path, "user.content-encoding")
b, _ = s.meta.RetrieveAttribute(bucket, object, contentEncHdr)
contentEncoding = string(b)
if err != nil {
contentEncoding = ""
}
if contentEncoding != "" {
m["content-encoding"] = contentEncoding
m[contentEncHdr] = contentEncoding
}
return
return contentType, contentEncoding
}
func isValidMeta(val string) bool {
if strings.HasPrefix(val, "user.X-Amz-Meta") {
if strings.HasPrefix(val, metaHdr) {
return true
}
if strings.EqualFold(val, "user.Expires") {
if strings.EqualFold(val, "Expires") {
return true
}
return false
}
func (s *ScoutFS) HeadObject(_ context.Context, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) {
func (s *ScoutFS) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) {
if input.Bucket == nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidBucketName)
}
if input.Key == nil {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
bucket := *input.Bucket
object := *input.Key
if input.PartNumber != nil {
uploadId, sum, err := s.retrieveUploadId(bucket, object)
if err != nil {
return nil, err
}
ents, err := os.ReadDir(filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum), uploadId))
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil {
return nil, fmt.Errorf("read parts: %w", err)
}
partPath := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum), uploadId, fmt.Sprintf("%v", *input.PartNumber))
part, err := os.Stat(filepath.Join(bucket, partPath))
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
}
if err != nil {
return nil, fmt.Errorf("stat part: %w", err)
}
b, err := s.meta.RetrieveAttribute(bucket, partPath, etagkey)
etag := string(b)
if err != nil {
etag = ""
}
partsCount := int32(len(ents))
size := part.Size()
return &s3.HeadObjectOutput{
LastModified: backend.GetTimePtr(part.ModTime()),
ETag: &etag,
PartsCount: &partsCount,
ContentLength: &size,
}, nil
}
_, err := os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket)
@@ -383,9 +495,14 @@ func (s *ScoutFS) HeadObject(_ context.Context, input *s3.HeadObjectInput) (*s3.
}
userMetaData := make(map[string]string)
contentType, contentEncoding := loadUserMetaData(objPath, userMetaData)
contentType, contentEncoding := s.loadUserMetaData(bucket, object, userMetaData)
b, err := xattr.Get(objPath, etagkey)
if fi.IsDir() {
// this is the media type for directories in AWS and Nextcloud
contentType = "application/x-directory"
}
b, err := s.meta.RetrieveAttribute(bucket, object, etagkey)
etag := string(b)
if err != nil {
etag = ""
@@ -424,18 +541,54 @@ func (s *ScoutFS) HeadObject(_ context.Context, input *s3.HeadObjectInput) (*s3.
contentLength := fi.Size()
var objectLockLegalHoldStatus types.ObjectLockLegalHoldStatus
status, err := s.Posix.GetObjectLegalHold(ctx, bucket, object, "")
if err == nil {
if *status {
objectLockLegalHoldStatus = types.ObjectLockLegalHoldStatusOn
} else {
objectLockLegalHoldStatus = types.ObjectLockLegalHoldStatusOff
}
}
var objectLockMode types.ObjectLockMode
var objectLockRetainUntilDate *time.Time
retention, err := s.Posix.GetObjectRetention(ctx, bucket, object, "")
if err == nil {
var config types.ObjectLockRetention
if err := json.Unmarshal(retention, &config); err == nil {
objectLockMode = types.ObjectLockMode(config.Mode)
objectLockRetainUntilDate = config.RetainUntilDate
}
}
return &s3.HeadObjectOutput{
ContentLength: &contentLength,
ContentType: &contentType,
ContentEncoding: &contentEncoding,
ETag: &etag,
LastModified: backend.GetTimePtr(fi.ModTime()),
Metadata: userMetaData,
StorageClass: stclass,
Restore: &requestOngoing,
ContentLength: &contentLength,
ContentType: &contentType,
ContentEncoding: &contentEncoding,
ETag: &etag,
LastModified: backend.GetTimePtr(fi.ModTime()),
Metadata: userMetaData,
StorageClass: stclass,
Restore: &requestOngoing,
ObjectLockLegalHoldStatus: objectLockLegalHoldStatus,
ObjectLockMode: objectLockMode,
ObjectLockRetainUntilDate: objectLockRetainUntilDate,
}, nil
}
func (s *ScoutFS) retrieveUploadId(bucket, object string) (string, [32]byte, error) {
sum := sha256.Sum256([]byte(object))
objdir := filepath.Join(bucket, metaTmpMultipartDir, fmt.Sprintf("%x", sum))
entries, err := os.ReadDir(objdir)
if err != nil || len(entries) == 0 {
return "", [32]byte{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
return entries[0].Name(), sum, nil
}
func (s *ScoutFS) GetObject(_ context.Context, input *s3.GetObjectInput, writer io.Writer) (*s3.GetObjectOutput, error) {
bucket := *input.Bucket
object := *input.Key
@@ -515,9 +668,9 @@ func (s *ScoutFS) GetObject(_ context.Context, input *s3.GetObjectInput, writer
userMetaData := make(map[string]string)
contentType, contentEncoding := loadUserMetaData(objPath, userMetaData)
contentType, contentEncoding := s.loadUserMetaData(bucket, object, userMetaData)
b, err := xattr.Get(objPath, etagkey)
b, err := s.meta.RetrieveAttribute(bucket, object, etagkey)
etag := string(b)
if err != nil {
etag = ""
@@ -671,14 +824,11 @@ func (s *ScoutFS) fileToObj(bucket string) backend.GetObjFunc {
if d.IsDir() {
// directory object only happens if directory empty
// check to see if this is a directory object by checking etag
etagBytes, err := xattr.Get(objPath, etagkey)
if isNoAttr(err) || errors.Is(err, fs.ErrNotExist) {
return types.Object{}, backend.ErrSkipObj
}
b, err := s.meta.RetrieveAttribute(bucket, path, etagkey)
if err != nil {
return types.Object{}, fmt.Errorf("get etag: %w", err)
}
etag := string(etagBytes)
etag := string(b)
fi, err := d.Info()
if errors.Is(err, fs.ErrNotExist) {
@@ -698,14 +848,14 @@ func (s *ScoutFS) fileToObj(bucket string) backend.GetObjFunc {
}
// file object, get object info and fill out object data
etagBytes, err := xattr.Get(objPath, etagkey)
b, err := s.meta.RetrieveAttribute(bucket, path, etagkey)
if errors.Is(err, fs.ErrNotExist) {
return types.Object{}, backend.ErrSkipObj
}
if err != nil && !isNoAttr(err) {
if err != nil {
return types.Object{}, fmt.Errorf("get etag: %w", err)
}
etag := string(etagBytes)
etag := string(b)
fi, err := d.Info()
if errors.Is(err, fs.ErrNotExist) {

View File

@@ -23,7 +23,6 @@ import (
"os"
"path/filepath"
"strconv"
"syscall"
"golang.org/x/sys/unix"
@@ -35,7 +34,9 @@ import (
)
func New(rootdir string, opts ScoutfsOpts) (*ScoutFS, error) {
p, err := posix.New(rootdir, meta.XattrMeta{}, posix.PosixOpts{
metastore := meta.XattrMeta{}
p, err := posix.New(rootdir, metastore, posix.PosixOpts{
ChownUID: opts.ChownUID,
ChownGID: opts.ChownGID,
})
@@ -52,6 +53,7 @@ func New(rootdir string, opts ScoutfsOpts) (*ScoutFS, error) {
Posix: p,
rootfd: f,
rootdir: rootdir,
meta: metastore,
chownuid: opts.ChownUID,
chowngid: opts.ChownGID,
}, nil
@@ -100,11 +102,6 @@ func (s *ScoutFS) openTmpFile(dir, bucket, obj string, size int64, acct auth.Acc
gid: gid,
}
// falloc is best effort, its fine if this fails
if size > 0 {
tmp.falloc()
}
if doChown {
err := f.Chown(uid, gid)
if err != nil {
@@ -115,14 +112,6 @@ func (s *ScoutFS) openTmpFile(dir, bucket, obj string, size int64, acct auth.Acc
return tmp, nil
}
func (tmp *tmpfile) falloc() error {
err := syscall.Fallocate(int(tmp.f.Fd()), 0, 0, tmp.size)
if err != nil {
return fmt.Errorf("fallocate: %v", err)
}
return nil
}
func (tmp *tmpfile) link() error {
// We use Linkat/Rename as the atomic operation for object puts. The
// upload is written to a temp (or unnamed/O_TMPFILE) file to not conflict