fix: unexpected errors during upload races

This fixes the cases for racing uploads with the same object names.
Before we were making some bad assumptions about what would cause
an error when trying to link/rename the final object name into
the namespace, but missed the case that another upload for the
same name could be racing with this upload and causing an incorrect
error.

This also changes the order of setting metadata to prevent
accidental setting of metadata for the current upload to another
racing upload.

This also fix auth.CheckObjectAccess() when objects are removed
while this runs.

Fixes #854
This commit is contained in:
Ben McClelland
2024-10-04 21:40:39 -07:00
parent d2b0d24520
commit b7a2e8a2c3
15 changed files with 614 additions and 291 deletions

View File

@@ -227,6 +227,9 @@ func CheckObjectAccess(ctx context.Context, bucket, userAccess string, objects [
status, err := be.GetObjectLegalHold(ctx, bucket, key, versionId)
if err != nil {
if errors.Is(err, s3err.GetAPIError(s3err.ErrNoSuchKey)) {
continue
}
if errors.Is(err, s3err.GetAPIError(s3err.ErrNoSuchObjectLockConfiguration)) {
checkLegalHold = false
} else {

View File

@@ -14,17 +14,19 @@
package meta
import "os"
// MetadataStorer defines the interface for managing metadata.
// When object == "", the operation is on the bucket.
type MetadataStorer interface {
// RetrieveAttribute retrieves the value of a specific attribute for an object or a bucket.
// Returns the value of the attribute, or an error if the attribute does not exist.
RetrieveAttribute(bucket, object, attribute string) ([]byte, error)
RetrieveAttribute(f *os.File, bucket, object, attribute string) ([]byte, error)
// StoreAttribute stores the value of a specific attribute for an object or a bucket.
// If attribute already exists, new attribute should replace existing.
// Returns an error if the operation fails.
StoreAttribute(bucket, object, attribute string, value []byte) error
StoreAttribute(f *os.File, bucket, object, attribute string, value []byte) error
// DeleteAttribute removes the value of a specific attribute for an object or a bucket.
// Returns an error if the operation fails.

View File

@@ -17,6 +17,7 @@ package meta
import (
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"syscall"
@@ -36,7 +37,15 @@ var (
type XattrMeta struct{}
// RetrieveAttribute retrieves the value of a specific attribute for an object in a bucket.
func (x XattrMeta) RetrieveAttribute(bucket, object, attribute string) ([]byte, error) {
func (x XattrMeta) RetrieveAttribute(f *os.File, bucket, object, attribute string) ([]byte, error) {
if f != nil {
b, err := xattr.FGet(f, xattrPrefix+attribute)
if errors.Is(err, xattr.ENOATTR) {
return nil, ErrNoSuchKey
}
return b, err
}
b, err := xattr.Get(filepath.Join(bucket, object), xattrPrefix+attribute)
if errors.Is(err, xattr.ENOATTR) {
return nil, ErrNoSuchKey
@@ -45,7 +54,11 @@ func (x XattrMeta) RetrieveAttribute(bucket, object, attribute string) ([]byte,
}
// StoreAttribute stores the value of a specific attribute for an object in a bucket.
func (x XattrMeta) StoreAttribute(bucket, object, attribute string, value []byte) error {
func (x XattrMeta) StoreAttribute(f *os.File, bucket, object, attribute string, value []byte) error {
if f != nil {
return xattr.FSet(f, xattrPrefix+attribute, value)
}
return xattr.Set(filepath.Join(bucket, object), xattrPrefix+attribute, value)
}

View File

@@ -153,10 +153,10 @@ func New(rootdir string, meta meta.MetadataStorer, opts PosixOpts) (*Posix, erro
if !vDir.IsDir() {
return nil, fmt.Errorf("versioning path should be a directory")
}
fmt.Printf("Bucket versioning enabled with directory: %v\n", verioningdirAbs)
}
fmt.Printf("Bucket versioning enabled with directory: %v\n", verioningdirAbs)
return &Posix{
meta: meta,
rootfd: f,
@@ -220,7 +220,7 @@ func (p *Posix) ListBuckets(_ context.Context, owner string, isAdmin bool) (s3re
continue
}
aclTag, err := p.meta.RetrieveAttribute(entry.Name(), "", aclkey)
aclTag, err := p.meta.RetrieveAttribute(nil, entry.Name(), "", aclkey)
if errors.Is(err, meta.ErrNoSuchKey) {
// skip buckets without acl tag
continue
@@ -292,7 +292,7 @@ func (p *Posix) CreateBucket(ctx context.Context, input *s3.CreateBucketInput, a
err := os.Mkdir(bucket, defaultDirPerm)
if err != nil && os.IsExist(err) {
aclJSON, err := p.meta.RetrieveAttribute(bucket, "", aclkey)
aclJSON, err := p.meta.RetrieveAttribute(nil, bucket, "", aclkey)
if err != nil {
return fmt.Errorf("get bucket acl: %w", err)
}
@@ -317,10 +317,12 @@ func (p *Posix) CreateBucket(ctx context.Context, input *s3.CreateBucketInput, a
}
}
if err := p.meta.StoreAttribute(bucket, "", aclkey, acl); err != nil {
err = p.meta.StoreAttribute(nil, bucket, "", aclkey, acl)
if err != nil {
return fmt.Errorf("set acl: %w", err)
}
if err := p.meta.StoreAttribute(bucket, "", ownershipkey, []byte(input.ObjectOwnership)); err != nil {
err = p.meta.StoreAttribute(nil, bucket, "", ownershipkey, []byte(input.ObjectOwnership))
if err != nil {
return fmt.Errorf("set ownership: %w", err)
}
@@ -345,7 +347,8 @@ func (p *Posix) CreateBucket(ctx context.Context, input *s3.CreateBucketInput, a
return fmt.Errorf("parse default bucket lock state: %w", err)
}
if err := p.meta.StoreAttribute(bucket, "", bucketLockKey, defaultLockParsed); err != nil {
err = p.meta.StoreAttribute(nil, bucket, "", bucketLockKey, defaultLockParsed)
if err != nil {
return fmt.Errorf("set default bucket lock: %w", err)
}
}
@@ -400,7 +403,8 @@ func (p *Posix) PutBucketOwnershipControls(_ context.Context, bucket string, own
return fmt.Errorf("stat bucket: %w", err)
}
if err := p.meta.StoreAttribute(bucket, "", ownershipkey, []byte(ownership)); err != nil {
err = p.meta.StoreAttribute(nil, bucket, "", ownershipkey, []byte(ownership))
if err != nil {
return fmt.Errorf("set ownership: %w", err)
}
@@ -416,7 +420,7 @@ func (p *Posix) GetBucketOwnershipControls(_ context.Context, bucket string) (ty
return ownship, fmt.Errorf("stat bucket: %w", err)
}
ownership, err := p.meta.RetrieveAttribute(bucket, "", ownershipkey)
ownership, err := p.meta.RetrieveAttribute(nil, bucket, "", ownershipkey)
if errors.Is(err, meta.ErrNoSuchKey) {
return ownship, s3err.GetAPIError(s3err.ErrOwnershipControlsNotFound)
}
@@ -435,7 +439,8 @@ func (p *Posix) DeleteBucketOwnershipControls(_ context.Context, bucket string)
return fmt.Errorf("stat bucket: %w", err)
}
if err := p.meta.DeleteAttribute(bucket, "", ownershipkey); err != nil {
err = p.meta.DeleteAttribute(bucket, "", ownershipkey)
if err != nil {
if errors.Is(err, meta.ErrNoSuchKey) {
return nil
}
@@ -483,7 +488,8 @@ func (p *Posix) PutBucketVersioning(ctx context.Context, bucket string, status t
versioning = []byte{0}
}
if err := p.meta.StoreAttribute(bucket, "", versioningKey, versioning); err != nil {
err = p.meta.StoreAttribute(nil, bucket, "", versioningKey, versioning)
if err != nil {
return fmt.Errorf("set versioning: %w", err)
}
@@ -503,7 +509,7 @@ func (p *Posix) GetBucketVersioning(_ context.Context, bucket string) (s3respons
return s3response.GetBucketVersioningOutput{}, fmt.Errorf("stat bucket: %w", err)
}
vData, err := p.meta.RetrieveAttribute(bucket, "", versioningKey)
vData, err := p.meta.RetrieveAttribute(nil, bucket, "", versioningKey)
if errors.Is(err, meta.ErrNoSuchKey) {
return s3response.GetBucketVersioningOutput{}, nil
} else if err != nil {
@@ -563,7 +569,7 @@ func (p *Posix) createObjVersion(bucket, key string, size int64, acc auth.Accoun
defer sf.Close()
var versionId string
data, err := p.meta.RetrieveAttribute(bucket, key, versionIdKey)
data, err := p.meta.RetrieveAttribute(sf, bucket, key, versionIdKey)
if err == nil {
versionId = string(data)
} else {
@@ -596,22 +602,23 @@ func (p *Posix) createObjVersion(bucket, key string, size int64, acc auth.Accoun
return versionPath, err
}
if err := f.link(); err != nil {
return versionPath, err
}
// Copy the object attributes(metadata)
for _, attr := range attrs {
data, err := p.meta.RetrieveAttribute(bucket, key, attr)
data, err := p.meta.RetrieveAttribute(sf, bucket, key, attr)
if err != nil {
return versionPath, fmt.Errorf("list %v attribute: %w", attr, err)
}
if err := p.meta.StoreAttribute(versionPath, "", attr, data); err != nil {
err = p.meta.StoreAttribute(f.File(), versionPath, "", attr, data)
if err != nil {
return versionPath, fmt.Errorf("store %v attribute: %w", attr, err)
}
}
if err := f.link(); err != nil {
return versionPath, err
}
return versionPath, nil
}
@@ -673,7 +680,7 @@ func getBoolPtr(b bool) *bool {
// Check if the given object is a delete marker
func (p *Posix) isObjDeleteMarker(bucket, object string) (bool, error) {
_, err := p.meta.RetrieveAttribute(bucket, object, deleteMarkerKey)
_, err := p.meta.RetrieveAttribute(nil, bucket, object, deleteMarkerKey)
if errors.Is(err, fs.ErrNotExist) {
return false, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
@@ -704,7 +711,7 @@ func (p *Posix) fileToObjVersions(bucket string) backend.GetVersionsFunc {
if d.IsDir() {
// directory object only happens if directory empty
// check to see if this is a directory object by checking etag
etagBytes, err := p.meta.RetrieveAttribute(bucket, path, etagkey)
etagBytes, err := p.meta.RetrieveAttribute(nil, bucket, path, etagkey)
if errors.Is(err, meta.ErrNoSuchKey) || errors.Is(err, fs.ErrNotExist) {
return nil, backend.ErrSkipObj
}
@@ -744,7 +751,7 @@ func (p *Posix) fileToObjVersions(bucket string) backend.GetVersionsFunc {
}
// file object, get object info and fill out object data
etagBytes, err := p.meta.RetrieveAttribute(bucket, path, etagkey)
etagBytes, err := p.meta.RetrieveAttribute(nil, bucket, path, etagkey)
if errors.Is(err, fs.ErrNotExist) {
return nil, backend.ErrSkipObj
}
@@ -757,7 +764,7 @@ func (p *Posix) fileToObjVersions(bucket string) backend.GetVersionsFunc {
// If the object doesn't have versionId, it's 'null'
versionId := "null"
versionIdBytes, err := p.meta.RetrieveAttribute(bucket, path, versionIdKey)
versionIdBytes, err := p.meta.RetrieveAttribute(nil, bucket, path, versionIdKey)
if err == nil {
versionId = string(versionIdBytes)
}
@@ -858,7 +865,7 @@ func (p *Posix) fileToObjVersions(bucket string) backend.GetVersionsFunc {
continue
}
etagBytes, err := p.meta.RetrieveAttribute(versionPath, versionId, etagkey)
etagBytes, err := p.meta.RetrieveAttribute(nil, versionPath, versionId, etagkey)
if errors.Is(err, fs.ErrNotExist) {
return nil, backend.ErrSkipObj
}
@@ -971,7 +978,7 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu *s3.CreateMultipa
// set an attribute with the original object name so that we can
// map the hashed name back to the original object name
err = p.meta.StoreAttribute(bucket, objdir, onameAttr, []byte(object))
err = p.meta.StoreAttribute(nil, bucket, objdir, onameAttr, []byte(object))
if err != nil {
// if we fail, cleanup the container directories
// but ignore errors because there might still be
@@ -983,7 +990,7 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu *s3.CreateMultipa
// set user metadata
for k, v := range mpu.Metadata {
err := p.meta.StoreAttribute(bucket, filepath.Join(objdir, uploadID),
err := p.meta.StoreAttribute(nil, bucket, filepath.Join(objdir, uploadID),
fmt.Sprintf("%v.%v", metaHdr, k), []byte(v))
if err != nil {
// cleanup object if returning error
@@ -1007,7 +1014,7 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu *s3.CreateMultipa
// set content-type
ctype := getString(mpu.ContentType)
if ctype != "" {
err := p.meta.StoreAttribute(bucket, filepath.Join(objdir, uploadID),
err := p.meta.StoreAttribute(nil, bucket, filepath.Join(objdir, uploadID),
contentTypeHdr, []byte(*mpu.ContentType))
if err != nil {
// cleanup object if returning error
@@ -1020,7 +1027,7 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu *s3.CreateMultipa
// set content-encoding
cenc := getString(mpu.ContentEncoding)
if cenc != "" {
err := p.meta.StoreAttribute(bucket, filepath.Join(objdir, uploadID), contentEncHdr,
err := p.meta.StoreAttribute(nil, bucket, filepath.Join(objdir, uploadID), contentEncHdr,
[]byte(*mpu.ContentEncoding))
if err != nil {
// cleanup object if returning error
@@ -1032,7 +1039,8 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu *s3.CreateMultipa
// set object legal hold
if mpu.ObjectLockLegalHoldStatus == types.ObjectLockLegalHoldStatusOn {
if err := p.PutObjectLegalHold(ctx, bucket, filepath.Join(objdir, uploadID), "", true); err != nil {
err := p.PutObjectLegalHold(ctx, bucket, filepath.Join(objdir, uploadID), "", true)
if err != nil {
// cleanup object if returning error
os.RemoveAll(filepath.Join(tmppath, uploadID))
os.Remove(tmppath)
@@ -1053,7 +1061,8 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu *s3.CreateMultipa
os.Remove(tmppath)
return s3response.InitiateMultipartUploadResult{}, fmt.Errorf("parse object lock retention: %w", err)
}
if err := p.PutObjectRetention(ctx, bucket, filepath.Join(objdir, uploadID), "", true, retParsed); err != nil {
err = p.PutObjectRetention(ctx, bucket, filepath.Join(objdir, uploadID), "", true, retParsed)
if err != nil {
// cleanup object if returning error
os.RemoveAll(filepath.Join(tmppath, uploadID))
os.Remove(tmppath)
@@ -1151,7 +1160,7 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
}
b, err := p.meta.RetrieveAttribute(bucket, partObjPath, etagkey)
b, err := p.meta.RetrieveAttribute(nil, bucket, partObjPath, etagkey)
etag := string(b)
if err != nil {
etag = ""
@@ -1221,97 +1230,89 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
}
}
err = f.link()
if err != nil {
return nil, fmt.Errorf("link object in namespace: %w", err)
}
// if the versioning is enabled, generate a new versionID for the object
var versionID string
if p.versioningEnabled() && vEnabled {
versionID = ulid.Make().String()
if err := p.meta.StoreAttribute(bucket, object, versionIdKey, []byte(versionID)); err != nil {
err := p.meta.StoreAttribute(f.File(), bucket, object, versionIdKey, []byte(versionID))
if err != nil {
return nil, fmt.Errorf("set versionId attr: %w", err)
}
}
for k, v := range userMetaData {
err = p.meta.StoreAttribute(bucket, object, fmt.Sprintf("%v.%v", metaHdr, k), []byte(v))
err = p.meta.StoreAttribute(f.File(), bucket, object, fmt.Sprintf("%v.%v", metaHdr, k), []byte(v))
if err != nil {
// cleanup object if returning error
os.Remove(objname)
return nil, fmt.Errorf("set user attr %q: %w", k, err)
}
}
// load and set tagging
tagging, err := p.meta.RetrieveAttribute(bucket, upiddir, tagHdr)
if err == nil {
if err := p.meta.StoreAttribute(bucket, object, tagHdr, tagging); err != nil {
// cleanup object
os.Remove(objname)
return nil, fmt.Errorf("set object tagging: %w", err)
}
}
tagging, err := p.meta.RetrieveAttribute(nil, bucket, upiddir, tagHdr)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object tagging: %w", err)
}
if err == nil {
err := p.meta.StoreAttribute(f.File(), bucket, object, tagHdr, tagging)
if err != nil {
return nil, fmt.Errorf("set object tagging: %w", err)
}
}
// set content-type
if cType != "" {
if err := p.meta.StoreAttribute(bucket, object, contentTypeHdr, []byte(cType)); err != nil {
// cleanup object
os.Remove(objname)
err := p.meta.StoreAttribute(f.File(), bucket, object, contentTypeHdr, []byte(cType))
if err != nil {
return nil, fmt.Errorf("set object content type: %w", err)
}
}
// set content-encoding
if cEnc != "" {
if err := p.meta.StoreAttribute(bucket, object, contentEncHdr, []byte(cEnc)); err != nil {
// cleanup object
os.Remove(objname)
err := p.meta.StoreAttribute(f.File(), bucket, object, contentEncHdr, []byte(cEnc))
if err != nil {
return nil, fmt.Errorf("set object content encoding: %w", err)
}
}
// load and set legal hold
lHold, err := p.meta.RetrieveAttribute(bucket, upiddir, objectLegalHoldKey)
if err == nil {
if err := p.meta.StoreAttribute(bucket, object, objectLegalHoldKey, lHold); err != nil {
// cleanup object
os.Remove(objname)
return nil, fmt.Errorf("set object legal hold: %w", err)
}
}
lHold, err := p.meta.RetrieveAttribute(nil, bucket, upiddir, objectLegalHoldKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object legal hold: %w", err)
}
// load and set retention
ret, err := p.meta.RetrieveAttribute(bucket, upiddir, objectRetentionKey)
if err == nil {
if err := p.meta.StoreAttribute(bucket, object, objectRetentionKey, ret); err != nil {
// cleanup object
os.Remove(objname)
return nil, fmt.Errorf("set object retention: %w", err)
err := p.meta.StoreAttribute(f.File(), bucket, object, objectLegalHoldKey, lHold)
if err != nil {
return nil, fmt.Errorf("set object legal hold: %w", err)
}
}
// load and set retention
ret, err := p.meta.RetrieveAttribute(nil, bucket, upiddir, objectRetentionKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object retention: %w", err)
}
if err == nil {
err := p.meta.StoreAttribute(f.File(), bucket, object, objectRetentionKey, ret)
if err != nil {
return nil, fmt.Errorf("set object retention: %w", err)
}
}
// Calculate s3 compatible md5sum for complete multipart.
s3MD5 := backend.GetMultipartMD5(parts)
err = p.meta.StoreAttribute(bucket, object, etagkey, []byte(s3MD5))
err = p.meta.StoreAttribute(f.File(), bucket, object, etagkey, []byte(s3MD5))
if err != nil {
// cleanup object if returning error
os.Remove(objname)
return nil, fmt.Errorf("set etag attr: %w", err)
}
err = f.link()
if err != nil {
return nil, fmt.Errorf("link object in namespace: %w", err)
}
// cleanup tmp dirs
os.RemoveAll(filepath.Join(bucket, objdir, uploadID))
// use Remove for objdir in case there are still other uploads
@@ -1363,7 +1364,7 @@ func (p *Posix) loadUserMetaData(bucket, object string, m map[string]string) (st
if !isValidMeta(e) {
continue
}
b, err := p.meta.RetrieveAttribute(bucket, object, e)
b, err := p.meta.RetrieveAttribute(nil, bucket, object, e)
if err != nil {
continue
}
@@ -1375,10 +1376,10 @@ func (p *Posix) loadUserMetaData(bucket, object string, m map[string]string) (st
}
var contentType, contentEncoding string
b, _ := p.meta.RetrieveAttribute(bucket, object, contentTypeHdr)
b, _ := p.meta.RetrieveAttribute(nil, bucket, object, contentTypeHdr)
contentType = string(b)
b, _ = p.meta.RetrieveAttribute(bucket, object, contentEncHdr)
b, _ = p.meta.RetrieveAttribute(nil, bucket, object, contentEncHdr)
contentEncoding = string(b)
return contentType, contentEncoding
@@ -1480,7 +1481,7 @@ func (p *Posix) ListMultipartUploads(_ context.Context, mpu *s3.ListMultipartUpl
continue
}
b, err := p.meta.RetrieveAttribute(bucket, filepath.Join(metaTmpMultipartDir, obj.Name()), onameAttr)
b, err := p.meta.RetrieveAttribute(nil, bucket, filepath.Join(metaTmpMultipartDir, obj.Name()), onameAttr)
if err != nil {
continue
}
@@ -1650,7 +1651,7 @@ func (p *Posix) ListParts(_ context.Context, input *s3.ListPartsInput) (s3respon
}
partPath := filepath.Join(objdir, uploadID, e.Name())
b, err := p.meta.RetrieveAttribute(bucket, partPath, etagkey)
b, err := p.meta.RetrieveAttribute(nil, bucket, partPath, etagkey)
etag := string(b)
if err != nil {
etag = ""
@@ -1752,6 +1753,7 @@ func (p *Posix) UploadPart(ctx context.Context, input *s3.UploadPartInput) (stri
}
return "", fmt.Errorf("open temp file: %w", err)
}
defer f.cleanup()
hash := md5.New()
tr := io.TeeReader(r, hash)
@@ -1763,20 +1765,18 @@ func (p *Posix) UploadPart(ctx context.Context, input *s3.UploadPartInput) (stri
return "", fmt.Errorf("write part data: %w", err)
}
dataSum := hash.Sum(nil)
etag := hex.EncodeToString(dataSum)
err = p.meta.StoreAttribute(f.File(), bucket, partPath, etagkey, []byte(etag))
if err != nil {
return "", fmt.Errorf("set etag attr: %w", err)
}
err = f.link()
if err != nil {
return "", fmt.Errorf("link object in namespace: %w", err)
}
f.cleanup()
dataSum := hash.Sum(nil)
etag := hex.EncodeToString(dataSum)
err = p.meta.StoreAttribute(bucket, partPath, etagkey, []byte(etag))
if err != nil {
return "", fmt.Errorf("set etag attr: %w", err)
}
return etag, nil
}
@@ -1839,7 +1839,7 @@ func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput)
if !p.versioningEnabled() || !vEnabled {
return s3response.CopyObjectResult{}, s3err.GetAPIError(s3err.ErrInvalidVersionId)
}
vId, err := p.meta.RetrieveAttribute(srcBucket, srcObject, versionIdKey)
vId, err := p.meta.RetrieveAttribute(nil, srcBucket, srcObject, versionIdKey)
if errors.Is(err, fs.ErrNotExist) {
return s3response.CopyObjectResult{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
@@ -1912,18 +1912,18 @@ func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput)
return s3response.CopyObjectResult{}, fmt.Errorf("copy part data: %w", err)
}
dataSum := hash.Sum(nil)
etag := hex.EncodeToString(dataSum)
err = p.meta.StoreAttribute(f.File(), *upi.Bucket, partPath, etagkey, []byte(etag))
if err != nil {
return s3response.CopyObjectResult{}, fmt.Errorf("set etag attr: %w", err)
}
err = f.link()
if err != nil {
return s3response.CopyObjectResult{}, fmt.Errorf("link object in namespace: %w", err)
}
dataSum := hash.Sum(nil)
etag := hex.EncodeToString(dataSum)
err = p.meta.StoreAttribute(*upi.Bucket, partPath, etagkey, []byte(etag))
if err != nil {
return s3response.CopyObjectResult{}, fmt.Errorf("set etag attr: %w", err)
}
fi, err = os.Stat(filepath.Join(*upi.Bucket, partPath))
if err != nil {
return s3response.CopyObjectResult{}, fmt.Errorf("stat part path: %w", err)
@@ -1999,7 +1999,7 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons
}
for k, v := range po.Metadata {
err := p.meta.StoreAttribute(*po.Bucket, *po.Key,
err := p.meta.StoreAttribute(nil, *po.Bucket, *po.Key,
fmt.Sprintf("%v.%v", metaHdr, k), []byte(v))
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("set user attr %q: %w", k, err)
@@ -2007,7 +2007,7 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons
}
// set etag attribute to signify this dir was specifically put
err = p.meta.StoreAttribute(*po.Bucket, *po.Key, etagkey,
err = p.meta.StoreAttribute(nil, *po.Bucket, *po.Key, etagkey,
[]byte(emptyMD5))
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("set etag attr: %w", err)
@@ -2066,6 +2066,7 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons
}
return s3response.PutObjectOutput{}, fmt.Errorf("write object data: %w", err)
}
dir := filepath.Dir(name)
if dir != "" {
err = backend.MkdirAll(dir, uid, gid, doChown)
@@ -2074,22 +2075,73 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons
}
}
err = f.link()
if err != nil {
return s3response.PutObjectOutput{}, s3err.GetAPIError(s3err.ErrExistingObjectIsDirectory)
dataSum := hash.Sum(nil)
etag := hex.EncodeToString(dataSum[:])
// if the versioning is enabled, generate a new versionID for the object
var versionID string
if p.versioningEnabled() && vEnabled {
versionID = ulid.Make().String()
}
for k, v := range po.Metadata {
err := p.meta.StoreAttribute(*po.Bucket, *po.Key,
err := p.meta.StoreAttribute(f.File(), *po.Bucket, *po.Key,
fmt.Sprintf("%v.%v", metaHdr, k), []byte(v))
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("set user attr %q: %w", k, err)
}
}
err = p.meta.StoreAttribute(f.File(), *po.Bucket, *po.Key, etagkey, []byte(etag))
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("set etag attr: %w", err)
}
ctype := getString(po.ContentType)
if ctype != "" {
err := p.meta.StoreAttribute(f.File(), *po.Bucket, *po.Key, contentTypeHdr,
[]byte(*po.ContentType))
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("set content-type attr: %w", err)
}
}
cenc := getString(po.ContentEncoding)
if cenc != "" {
err := p.meta.StoreAttribute(f.File(), *po.Bucket, *po.Key, contentEncHdr,
[]byte(*po.ContentEncoding))
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("set content-encoding attr: %w", err)
}
}
if versionID != "" {
err := p.meta.StoreAttribute(f.File(), *po.Bucket, *po.Key, versionIdKey, []byte(versionID))
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("set versionId attr: %w", err)
}
}
err = f.link()
if errors.Is(err, syscall.EEXIST) {
return s3response.PutObjectOutput{
ETag: etag,
VersionID: versionID,
}, nil
}
if err != nil {
return s3response.PutObjectOutput{}, s3err.GetAPIError(s3err.ErrExistingObjectIsDirectory)
}
// Set object tagging
if tagsStr != "" {
err := p.PutObjectTagging(ctx, *po.Bucket, *po.Key, tags)
if errors.Is(err, fs.ErrNotExist) {
return s3response.PutObjectOutput{
ETag: etag,
VersionID: versionID,
}, nil
}
if err != nil {
return s3response.PutObjectOutput{}, err
}
@@ -2097,7 +2149,8 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons
// Set object legal hold
if po.ObjectLockLegalHoldStatus == types.ObjectLockLegalHoldStatusOn {
if err := p.PutObjectLegalHold(ctx, *po.Bucket, *po.Key, "", true); err != nil {
err := p.PutObjectLegalHold(ctx, *po.Bucket, *po.Key, "", true)
if err != nil {
return s3response.PutObjectOutput{}, err
}
}
@@ -2112,46 +2165,12 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("parse object lock retention: %w", err)
}
if err := p.PutObjectRetention(ctx, *po.Bucket, *po.Key, "", true, retParsed); err != nil {
err = p.PutObjectRetention(ctx, *po.Bucket, *po.Key, "", true, retParsed)
if err != nil {
return s3response.PutObjectOutput{}, err
}
}
dataSum := hash.Sum(nil)
etag := hex.EncodeToString(dataSum[:])
err = p.meta.StoreAttribute(*po.Bucket, *po.Key, etagkey, []byte(etag))
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("set etag attr: %w", err)
}
ctype := getString(po.ContentType)
if ctype != "" {
err := p.meta.StoreAttribute(*po.Bucket, *po.Key, contentTypeHdr,
[]byte(*po.ContentType))
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("set content-type attr: %w", err)
}
}
cenc := getString(po.ContentEncoding)
if cenc != "" {
err := p.meta.StoreAttribute(*po.Bucket, *po.Key, contentEncHdr,
[]byte(*po.ContentEncoding))
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("set content-encoding attr: %w", err)
}
}
// if the versioning is enabled, generate a new versionID for the object
var versionID string
if p.versioningEnabled() && vEnabled {
versionID = ulid.Make().String()
if err := p.meta.StoreAttribute(*po.Bucket, *po.Key, versionIdKey, []byte(versionID)); err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("set versionId attr: %w", err)
}
}
return s3response.PutObjectOutput{
ETag: etag,
VersionID: versionID,
@@ -2213,12 +2232,14 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
}
// Mark the object as a delete marker
if err := p.meta.StoreAttribute(bucket, object, deleteMarkerKey, []byte{}); err != nil {
err = p.meta.StoreAttribute(nil, bucket, object, deleteMarkerKey, []byte{})
if err != nil {
return nil, fmt.Errorf("set delete marker: %w", err)
}
// Generate & set a unique versionId for the delete marker
versionId := ulid.Make().String()
if err := p.meta.StoreAttribute(bucket, object, versionIdKey, []byte(versionId)); err != nil {
err = p.meta.StoreAttribute(nil, bucket, object, versionIdKey, []byte(versionId))
if err != nil {
return nil, fmt.Errorf("set versionId: %w", err)
}
@@ -2229,7 +2250,7 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
} else {
versionPath := p.genObjVersionPath(bucket, object)
vId, err := p.meta.RetrieveAttribute(bucket, object, versionIdKey)
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) && !errors.Is(err, fs.ErrNotExist) {
return nil, fmt.Errorf("get obj versionId: %w", err)
}
@@ -2302,17 +2323,19 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
}
for _, attr := range attrs {
data, err := p.meta.RetrieveAttribute(versionPath, srcVersionId, attr)
data, err := p.meta.RetrieveAttribute(nil, versionPath, srcVersionId, attr)
if err != nil {
return nil, fmt.Errorf("load %v attribute", attr)
}
if err := p.meta.StoreAttribute(bucket, object, attr, data); err != nil {
err = p.meta.StoreAttribute(nil, bucket, object, attr, data)
if err != nil {
return nil, fmt.Errorf("store %v attribute", attr)
}
}
if err := os.Remove(filepath.Join(versionPath, srcVersionId)); err != nil {
err = os.Remove(filepath.Join(versionPath, srcVersionId))
if err != nil {
return nil, fmt.Errorf("remove obj version %w", err)
}
@@ -2403,7 +2426,7 @@ func (p *Posix) removeParents(bucket, object string) error {
break
}
_, err := p.meta.RetrieveAttribute(bucket, parent, etagkey)
_, err := p.meta.RetrieveAttribute(nil, bucket, parent, etagkey)
if err == nil {
// a directory with a valid etag means this was specifically
// uploaded with a put object, so stop here and leave this
@@ -2497,7 +2520,7 @@ func (p *Posix) GetObject(_ context.Context, input *s3.GetObjectInput) (*s3.GetO
object := *input.Key
if versionId != "" {
vId, err := p.meta.RetrieveAttribute(bucket, object, versionIdKey)
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
@@ -2586,7 +2609,7 @@ func (p *Posix) GetObject(_ context.Context, input *s3.GetObjectInput) (*s3.GetO
_, contentEncoding := p.loadUserMetaData(bucket, object, userMetaData)
contentType := backend.DirContentType
b, err := p.meta.RetrieveAttribute(bucket, object, etagkey)
b, err := p.meta.RetrieveAttribute(nil, bucket, object, etagkey)
etag := string(b)
if err != nil {
etag = ""
@@ -2619,7 +2642,7 @@ func (p *Posix) GetObject(_ context.Context, input *s3.GetObjectInput) (*s3.GetO
// If versioning is configured get the object versionId
if p.versioningEnabled() && versionId == "" {
vId, err := p.meta.RetrieveAttribute(bucket, object, versionIdKey)
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if errors.Is(err, meta.ErrNoSuchKey) {
versionId = nullVersionId
} else if err != nil {
@@ -2633,7 +2656,7 @@ func (p *Posix) GetObject(_ context.Context, input *s3.GetObjectInput) (*s3.GetO
contentType, contentEncoding := p.loadUserMetaData(bucket, object, userMetaData)
b, err := p.meta.RetrieveAttribute(bucket, object, etagkey)
b, err := p.meta.RetrieveAttribute(nil, bucket, object, etagkey)
etag := string(b)
if err != nil {
etag = ""
@@ -2718,7 +2741,7 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.
return nil, fmt.Errorf("stat part: %w", err)
}
b, err := p.meta.RetrieveAttribute(bucket, partPath, etagkey)
b, err := p.meta.RetrieveAttribute(nil, bucket, partPath, etagkey)
etag := string(b)
if err != nil {
etag = ""
@@ -2744,7 +2767,7 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.
}
if *input.VersionId != "" {
vId, err := p.meta.RetrieveAttribute(bucket, object, versionIdKey)
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
@@ -2806,7 +2829,7 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.
contentType = backend.DirContentType
}
b, err := p.meta.RetrieveAttribute(bucket, object, etagkey)
b, err := p.meta.RetrieveAttribute(nil, bucket, object, etagkey)
etag := string(b)
if err != nil {
etag = ""
@@ -2908,7 +2931,7 @@ func (p *Posix) CopyObject(ctx context.Context, input *s3.CopyObjectInput) (*s3.
if !p.versioningEnabled() || !vEnabled {
return nil, s3err.GetAPIError(s3err.ErrInvalidVersionId)
}
vId, err := p.meta.RetrieveAttribute(srcBucket, srcObject, versionIdKey)
vId, err := p.meta.RetrieveAttribute(nil, srcBucket, srcObject, versionIdKey)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
@@ -2977,16 +3000,16 @@ func (p *Posix) CopyObject(ctx context.Context, input *s3.CopyObjectInput) (*s3.
}
}
for k, v := range input.Metadata {
err := p.meta.StoreAttribute(dstBucket, dstObject,
err := p.meta.StoreAttribute(nil, dstBucket, dstObject,
fmt.Sprintf("%v.%v", metaHdr, k), []byte(v))
if err != nil {
return nil, fmt.Errorf("set user attr %q: %w", k, err)
}
}
b, _ := p.meta.RetrieveAttribute(dstBucket, dstObject, etagkey)
b, _ := p.meta.RetrieveAttribute(nil, dstBucket, dstObject, etagkey)
etag = string(b)
vId, _ := p.meta.RetrieveAttribute(dstBucket, dstObject, versionIdKey)
vId, _ := p.meta.RetrieveAttribute(nil, dstBucket, dstObject, versionIdKey)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
@@ -3078,7 +3101,7 @@ func (p *Posix) fileToObj(bucket string) backend.GetObjFunc {
if d.IsDir() {
// directory object only happens if directory empty
// check to see if this is a directory object by checking etag
etagBytes, err := p.meta.RetrieveAttribute(bucket, path, etagkey)
etagBytes, err := p.meta.RetrieveAttribute(nil, bucket, path, etagkey)
if errors.Is(err, meta.ErrNoSuchKey) || errors.Is(err, fs.ErrNotExist) {
return s3response.Object{}, backend.ErrSkipObj
}
@@ -3114,7 +3137,7 @@ func (p *Posix) fileToObj(bucket string) backend.GetObjFunc {
}
// file object, get object info and fill out object data
etagBytes, err := p.meta.RetrieveAttribute(bucket, path, etagkey)
etagBytes, err := p.meta.RetrieveAttribute(nil, bucket, path, etagkey)
if errors.Is(err, fs.ErrNotExist) {
return s3response.Object{}, backend.ErrSkipObj
}
@@ -3218,7 +3241,8 @@ func (p *Posix) PutBucketAcl(_ context.Context, bucket string, data []byte) erro
return fmt.Errorf("stat bucket: %w", err)
}
if err := p.meta.StoreAttribute(bucket, "", aclkey, data); err != nil {
err = p.meta.StoreAttribute(nil, bucket, "", aclkey, data)
if err != nil {
return fmt.Errorf("set acl: %w", err)
}
@@ -3237,7 +3261,7 @@ func (p *Posix) GetBucketAcl(_ context.Context, input *s3.GetBucketAclInput) ([]
return nil, fmt.Errorf("stat bucket: %w", err)
}
b, err := p.meta.RetrieveAttribute(*input.Bucket, "", aclkey)
b, err := p.meta.RetrieveAttribute(nil, *input.Bucket, "", aclkey)
if errors.Is(err, meta.ErrNoSuchKey) {
return []byte{}, nil
}
@@ -3270,7 +3294,7 @@ func (p *Posix) PutBucketTagging(_ context.Context, bucket string, tags map[stri
return fmt.Errorf("marshal tags: %w", err)
}
err = p.meta.StoreAttribute(bucket, "", tagHdr, b)
err = p.meta.StoreAttribute(nil, bucket, "", tagHdr, b)
if err != nil {
return fmt.Errorf("set tags: %w", err)
}
@@ -3313,7 +3337,7 @@ func (p *Posix) GetObjectTagging(_ context.Context, bucket, object string) (map[
func (p *Posix) getAttrTags(bucket, object string) (map[string]string, error) {
tags := make(map[string]string)
b, err := p.meta.RetrieveAttribute(bucket, object, tagHdr)
b, err := p.meta.RetrieveAttribute(nil, bucket, object, tagHdr)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
@@ -3360,7 +3384,7 @@ func (p *Posix) PutObjectTagging(_ context.Context, bucket, object string, tags
return fmt.Errorf("marshal tags: %w", err)
}
err = p.meta.StoreAttribute(bucket, object, tagHdr, b)
err = p.meta.StoreAttribute(nil, bucket, object, tagHdr, b)
if errors.Is(err, fs.ErrNotExist) {
return s3err.GetAPIError(s3err.ErrNoSuchKey)
}
@@ -3397,7 +3421,7 @@ func (p *Posix) PutBucketPolicy(ctx context.Context, bucket string, policy []byt
return nil
}
err = p.meta.StoreAttribute(bucket, "", policykey, policy)
err = p.meta.StoreAttribute(nil, bucket, "", policykey, policy)
if err != nil {
return fmt.Errorf("set policy: %w", err)
}
@@ -3414,7 +3438,7 @@ func (p *Posix) GetBucketPolicy(ctx context.Context, bucket string) ([]byte, err
return nil, fmt.Errorf("stat bucket: %w", err)
}
policy, err := p.meta.RetrieveAttribute(bucket, "", policykey)
policy, err := p.meta.RetrieveAttribute(nil, bucket, "", policykey)
if errors.Is(err, meta.ErrNoSuchKey) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchBucketPolicy)
}
@@ -3441,7 +3465,7 @@ func (p *Posix) PutObjectLockConfiguration(ctx context.Context, bucket string, c
return fmt.Errorf("stat bucket: %w", err)
}
cfg, err := p.meta.RetrieveAttribute(bucket, "", bucketLockKey)
cfg, err := p.meta.RetrieveAttribute(nil, bucket, "", bucketLockKey)
if errors.Is(err, meta.ErrNoSuchKey) {
return s3err.GetAPIError(s3err.ErrObjectLockConfigurationNotAllowed)
}
@@ -3458,7 +3482,8 @@ func (p *Posix) PutObjectLockConfiguration(ctx context.Context, bucket string, c
return s3err.GetAPIError(s3err.ErrObjectLockConfigurationNotAllowed)
}
if err := p.meta.StoreAttribute(bucket, "", bucketLockKey, config); err != nil {
err = p.meta.StoreAttribute(nil, bucket, "", bucketLockKey, config)
if err != nil {
return fmt.Errorf("set object lock config: %w", err)
}
@@ -3474,7 +3499,7 @@ func (p *Posix) GetObjectLockConfiguration(_ context.Context, bucket string) ([]
return nil, fmt.Errorf("stat bucket: %w", err)
}
cfg, err := p.meta.RetrieveAttribute(bucket, "", bucketLockKey)
cfg, err := p.meta.RetrieveAttribute(nil, bucket, "", bucketLockKey)
if errors.Is(err, meta.ErrNoSuchKey) {
return nil, s3err.GetAPIError(s3err.ErrObjectLockConfigurationNotFound)
}
@@ -3494,7 +3519,7 @@ func (p *Posix) PutObjectLegalHold(_ context.Context, bucket, object, versionId
return fmt.Errorf("stat bucket: %w", err)
}
cfg, err := p.meta.RetrieveAttribute(bucket, "", bucketLockKey)
cfg, err := p.meta.RetrieveAttribute(nil, bucket, "", bucketLockKey)
if errors.Is(err, meta.ErrNoSuchKey) {
return s3err.GetAPIError(s3err.ErrInvalidBucketObjectLockConfiguration)
}
@@ -3523,7 +3548,7 @@ func (p *Posix) PutObjectLegalHold(_ context.Context, bucket, object, versionId
//TODO: Maybe we need to return our custom error here?
return s3err.GetAPIError(s3err.ErrInvalidVersionId)
}
vId, err := p.meta.RetrieveAttribute(bucket, object, versionIdKey)
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if errors.Is(err, fs.ErrNotExist) {
return s3err.GetAPIError(s3err.ErrNoSuchKey)
}
@@ -3537,7 +3562,7 @@ func (p *Posix) PutObjectLegalHold(_ context.Context, bucket, object, versionId
}
}
err = p.meta.StoreAttribute(bucket, object, objectLegalHoldKey, statusData)
err = p.meta.StoreAttribute(nil, bucket, object, objectLegalHoldKey, statusData)
if errors.Is(err, fs.ErrNotExist) {
if versionId != "" {
return s3err.GetAPIError(s3err.ErrInvalidVersionId)
@@ -3565,7 +3590,7 @@ func (p *Posix) GetObjectLegalHold(_ context.Context, bucket, object, versionId
//TODO: Maybe we need to return our custom error here?
return nil, s3err.GetAPIError(s3err.ErrInvalidVersionId)
}
vId, err := p.meta.RetrieveAttribute(bucket, object, versionIdKey)
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
@@ -3579,7 +3604,7 @@ func (p *Posix) GetObjectLegalHold(_ context.Context, bucket, object, versionId
}
}
data, err := p.meta.RetrieveAttribute(bucket, object, objectLegalHoldKey)
data, err := p.meta.RetrieveAttribute(nil, bucket, object, objectLegalHoldKey)
if errors.Is(err, fs.ErrNotExist) {
if versionId != "" {
return nil, s3err.GetAPIError(s3err.ErrInvalidVersionId)
@@ -3607,7 +3632,7 @@ func (p *Posix) PutObjectRetention(_ context.Context, bucket, object, versionId
return fmt.Errorf("stat bucket: %w", err)
}
cfg, err := p.meta.RetrieveAttribute(bucket, "", bucketLockKey)
cfg, err := p.meta.RetrieveAttribute(nil, bucket, "", bucketLockKey)
if errors.Is(err, meta.ErrNoSuchKey) {
return s3err.GetAPIError(s3err.ErrInvalidBucketObjectLockConfiguration)
}
@@ -3629,7 +3654,7 @@ func (p *Posix) PutObjectRetention(_ context.Context, bucket, object, versionId
//TODO: Maybe we need to return our custom error here?
return s3err.GetAPIError(s3err.ErrInvalidVersionId)
}
vId, err := p.meta.RetrieveAttribute(bucket, object, versionIdKey)
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if errors.Is(err, fs.ErrNotExist) {
return s3err.GetAPIError(s3err.ErrNoSuchKey)
}
@@ -3643,7 +3668,7 @@ func (p *Posix) PutObjectRetention(_ context.Context, bucket, object, versionId
}
}
objectLockCfg, err := p.meta.RetrieveAttribute(bucket, object, objectRetentionKey)
objectLockCfg, err := p.meta.RetrieveAttribute(nil, bucket, object, objectRetentionKey)
if errors.Is(err, fs.ErrNotExist) {
if versionId != "" {
return s3err.GetAPIError(s3err.ErrInvalidVersionId)
@@ -3651,7 +3676,8 @@ func (p *Posix) PutObjectRetention(_ context.Context, bucket, object, versionId
return s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if errors.Is(err, meta.ErrNoSuchKey) {
if err := p.meta.StoreAttribute(bucket, object, objectRetentionKey, retention); err != nil {
err := p.meta.StoreAttribute(nil, bucket, object, objectRetentionKey, retention)
if err != nil {
return fmt.Errorf("set object lock config: %w", err)
}
@@ -3677,7 +3703,8 @@ func (p *Posix) PutObjectRetention(_ context.Context, bucket, object, versionId
}
}
if err := p.meta.StoreAttribute(bucket, object, objectRetentionKey, retention); err != nil {
err = p.meta.StoreAttribute(nil, bucket, object, objectRetentionKey, retention)
if err != nil {
return fmt.Errorf("set object lock config: %w", err)
}
@@ -3698,7 +3725,7 @@ func (p *Posix) GetObjectRetention(_ context.Context, bucket, object, versionId
//TODO: Maybe we need to return our custom error here?
return nil, s3err.GetAPIError(s3err.ErrInvalidVersionId)
}
vId, err := p.meta.RetrieveAttribute(bucket, object, versionIdKey)
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
@@ -3712,7 +3739,7 @@ func (p *Posix) GetObjectRetention(_ context.Context, bucket, object, versionId
}
}
data, err := p.meta.RetrieveAttribute(bucket, object, objectRetentionKey)
data, err := p.meta.RetrieveAttribute(nil, bucket, object, objectRetentionKey)
if errors.Is(err, fs.ErrNotExist) {
if versionId != "" {
return nil, s3err.GetAPIError(s3err.ErrInvalidVersionId)
@@ -3749,7 +3776,7 @@ func (p *Posix) ListBucketsAndOwners(ctx context.Context) (buckets []s3response.
continue
}
aclTag, err := p.meta.RetrieveAttribute(entry.Name(), "", aclkey)
aclTag, err := p.meta.RetrieveAttribute(nil, entry.Name(), "", aclkey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return buckets, fmt.Errorf("get acl tag: %w", err)
}

View File

@@ -134,6 +134,9 @@ func (tmp *tmpfile) falloc() error {
}
func (tmp *tmpfile) link() error {
// make sure this is cleaned up in all error cases
defer tmp.f.Close()
// We use Linkat/Rename as the atomic operation for object puts. The
// upload is written to a temp (or unnamed/O_TMPFILE) file to not conflict
// with any other simultaneous uploads. The final operation is to move the
@@ -170,11 +173,21 @@ func (tmp *tmpfile) link() error {
}
defer dirf.Close()
err = unix.Linkat(int(procdir.Fd()), filepath.Base(tmp.f.Name()),
int(dirf.Fd()), filepath.Base(objPath), unix.AT_SYMLINK_FOLLOW)
if err != nil {
return fmt.Errorf("link tmpfile (%q in %q): %w",
filepath.Dir(objPath), filepath.Base(tmp.f.Name()), err)
for {
err = unix.Linkat(int(procdir.Fd()), filepath.Base(tmp.f.Name()),
int(dirf.Fd()), filepath.Base(objPath), unix.AT_SYMLINK_FOLLOW)
if errors.Is(err, syscall.EEXIST) {
err := os.Remove(objPath)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("remove stale path: %w", err)
}
continue
}
if err != nil {
return fmt.Errorf("link tmpfile (fd %q as %q): %w",
filepath.Base(tmp.f.Name()), objPath, err)
}
break
}
err = tmp.f.Close()

View File

@@ -226,7 +226,7 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
}
b, err := s.meta.RetrieveAttribute(bucket, partObjPath, etagkey)
b, err := s.meta.RetrieveAttribute(nil, bucket, partObjPath, etagkey)
etag := string(b)
if err != nil {
etag = ""
@@ -262,7 +262,7 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
// scoutfs move data is a metadata only operation that moves the data
// extent references from the source, appeding to the destination.
// this needs to be 4k aligned.
err = moveData(pf, f.f)
err = moveData(pf, f.File())
pf.Close()
if err != nil {
return nil, fmt.Errorf("move blocks part %v: %v", *part.PartNumber, err)
@@ -282,78 +282,71 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
return nil, err
}
}
err = f.link()
if err != nil {
return nil, fmt.Errorf("link object in namespace: %w", err)
}
for k, v := range userMetaData {
err = s.meta.StoreAttribute(bucket, object, fmt.Sprintf("%v.%v", metaHdr, k), []byte(v))
err = s.meta.StoreAttribute(f.File(), bucket, object, fmt.Sprintf("%v.%v", metaHdr, k), []byte(v))
if err != nil {
// cleanup object if returning error
os.Remove(objname)
return nil, fmt.Errorf("set user attr %q: %w", k, err)
}
}
// load and set tagging
tagging, err := s.meta.RetrieveAttribute(bucket, upiddir, tagHdr)
if err == nil {
if err := s.meta.StoreAttribute(bucket, object, tagHdr, tagging); err != nil {
// cleanup object
os.Remove(objname)
return nil, fmt.Errorf("set object tagging: %w", err)
}
}
tagging, err := s.meta.RetrieveAttribute(nil, bucket, upiddir, tagHdr)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object tagging: %w", err)
}
if err == nil {
err := s.meta.StoreAttribute(f.File(), bucket, object, tagHdr, tagging)
if err != nil {
return nil, fmt.Errorf("set object tagging: %w", err)
}
}
// set content-type
if cType != "" {
if err := s.meta.StoreAttribute(bucket, object, contentTypeHdr, []byte(cType)); err != nil {
// cleanup object
os.Remove(objname)
err := s.meta.StoreAttribute(f.File(), bucket, object, contentTypeHdr, []byte(cType))
if err != nil {
return nil, fmt.Errorf("set object content type: %w", err)
}
}
// load and set legal hold
lHold, err := s.meta.RetrieveAttribute(bucket, upiddir, objectLegalHoldKey)
if err == nil {
if err := s.meta.StoreAttribute(bucket, object, objectLegalHoldKey, lHold); err != nil {
// cleanup object
os.Remove(objname)
return nil, fmt.Errorf("set object legal hold: %w", err)
}
}
lHold, err := s.meta.RetrieveAttribute(nil, bucket, upiddir, objectLegalHoldKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object legal hold: %w", err)
}
// load and set retention
ret, err := s.meta.RetrieveAttribute(bucket, upiddir, objectRetentionKey)
if err == nil {
if err := s.meta.StoreAttribute(bucket, object, objectRetentionKey, ret); err != nil {
// cleanup object
os.Remove(objname)
return nil, fmt.Errorf("set object retention: %w", err)
err := s.meta.StoreAttribute(f.File(), bucket, object, objectLegalHoldKey, lHold)
if err != nil {
return nil, fmt.Errorf("set object legal hold: %w", err)
}
}
// load and set retention
ret, err := s.meta.RetrieveAttribute(nil, bucket, upiddir, objectRetentionKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object retention: %w", err)
}
if err == nil {
err := s.meta.StoreAttribute(f.File(), bucket, object, objectRetentionKey, ret)
if err != nil {
return nil, fmt.Errorf("set object retention: %w", err)
}
}
// Calculate s3 compatible md5sum for complete multipart.
s3MD5 := backend.GetMultipartMD5(parts)
err = s.meta.StoreAttribute(bucket, object, etagkey, []byte(s3MD5))
err = s.meta.StoreAttribute(f.File(), bucket, object, etagkey, []byte(s3MD5))
if err != nil {
// cleanup object if returning error
os.Remove(objname)
return nil, fmt.Errorf("set etag attr: %w", err)
}
err = f.link()
if err != nil {
return nil, fmt.Errorf("link object in namespace: %w", err)
}
// cleanup tmp dirs
os.RemoveAll(upiddir)
// use Remove for objdir in case there are still other uploads
@@ -392,7 +385,7 @@ func (s *ScoutFS) loadUserMetaData(bucket, object string, m map[string]string) (
if !isValidMeta(e) {
continue
}
b, err := s.meta.RetrieveAttribute(bucket, object, e)
b, err := s.meta.RetrieveAttribute(nil, bucket, object, e)
if err != nil {
continue
}
@@ -404,13 +397,13 @@ func (s *ScoutFS) loadUserMetaData(bucket, object string, m map[string]string) (
}
var contentType, contentEncoding string
b, _ := s.meta.RetrieveAttribute(bucket, object, contentTypeHdr)
b, _ := s.meta.RetrieveAttribute(nil, bucket, object, contentTypeHdr)
contentType = string(b)
if contentType != "" {
m[contentTypeHdr] = contentType
}
b, _ = s.meta.RetrieveAttribute(bucket, object, contentEncHdr)
b, _ = s.meta.RetrieveAttribute(nil, bucket, object, contentEncHdr)
contentEncoding = string(b)
if contentEncoding != "" {
m[contentEncHdr] = contentEncoding
@@ -466,7 +459,7 @@ func (s *ScoutFS) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s
return nil, fmt.Errorf("stat part: %w", err)
}
b, err := s.meta.RetrieveAttribute(bucket, partPath, etagkey)
b, err := s.meta.RetrieveAttribute(nil, bucket, partPath, etagkey)
etag := string(b)
if err != nil {
etag = ""
@@ -514,7 +507,7 @@ func (s *ScoutFS) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s
contentType = "application/x-directory"
}
b, err := s.meta.RetrieveAttribute(bucket, object, etagkey)
b, err := s.meta.RetrieveAttribute(nil, bucket, object, etagkey)
etag := string(b)
if err != nil {
etag = ""
@@ -685,7 +678,7 @@ func (s *ScoutFS) GetObject(_ context.Context, input *s3.GetObjectInput) (*s3.Ge
contentType, contentEncoding := s.loadUserMetaData(bucket, object, userMetaData)
b, err := s.meta.RetrieveAttribute(bucket, object, etagkey)
b, err := s.meta.RetrieveAttribute(nil, bucket, object, etagkey)
etag := string(b)
if err != nil {
etag = ""
@@ -840,7 +833,7 @@ func (s *ScoutFS) fileToObj(bucket string) backend.GetObjFunc {
if d.IsDir() {
// directory object only happens if directory empty
// check to see if this is a directory object by checking etag
etagBytes, err := s.meta.RetrieveAttribute(bucket, path, etagkey)
etagBytes, err := s.meta.RetrieveAttribute(nil, bucket, path, etagkey)
if errors.Is(err, meta.ErrNoSuchKey) || errors.Is(err, fs.ErrNotExist) {
return s3response.Object{}, backend.ErrSkipObj
}
@@ -869,7 +862,7 @@ func (s *ScoutFS) fileToObj(bucket string) backend.GetObjFunc {
}
// file object, get object info and fill out object data
b, err := s.meta.RetrieveAttribute(bucket, path, etagkey)
b, err := s.meta.RetrieveAttribute(nil, bucket, path, etagkey)
if errors.Is(err, fs.ErrNotExist) {
return s3response.Object{}, backend.ErrSkipObj
}

View File

@@ -174,6 +174,10 @@ func (tmp *tmpfile) cleanup() {
tmp.f.Close()
}
func (tmp *tmpfile) File() *os.File {
return tmp.f
}
func moveData(from *os.File, to *os.File) error {
return scoutfs.MoveData(from, to)
}

View File

@@ -28,9 +28,7 @@ func New(rootdir string, opts ScoutfsOpts) (*ScoutFS, error) {
return nil, fmt.Errorf("scoutfs only available on linux")
}
type tmpfile struct {
f *os.File
}
type tmpfile struct{}
var (
errNotSupported = errors.New("not supported")
@@ -56,6 +54,10 @@ func (tmp *tmpfile) Write(b []byte) (int, error) {
func (tmp *tmpfile) cleanup() {
}
func (tmp *tmpfile) File() *os.File {
return nil
}
func moveData(_, _ *os.File) error {
return errNotSupported
}

View File

@@ -7,6 +7,7 @@ import (
"path/filepath"
"sync"
"testing"
"time"
"github.com/versity/versitygw/backend/meta"
"github.com/versity/versitygw/backend/posix"
@@ -75,6 +76,9 @@ func initPosix(ctx context.Context) {
}
wg.Done()
}()
// wait for server to start
time.Sleep(1 * time.Second)
}
func TestIntegration(t *testing.T) {

1
go.mod
View File

@@ -23,6 +23,7 @@ require (
github.com/urfave/cli/v2 v2.27.4
github.com/valyala/fasthttp v1.56.0
github.com/versity/scoutfs-go v0.0.0-20240325223134-38eb2f5f7d44
golang.org/x/sync v0.8.0
golang.org/x/sys v0.26.0
)

2
go.sum
View File

@@ -215,6 +215,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

View File

@@ -134,6 +134,7 @@ func TestPutObject(s *S3Conf) {
PutObject_missing_object_lock_retention_config(s)
PutObject_with_object_lock(s)
PutObject_success(s)
PutObject_racey_success(s)
PutObject_invalid_credentials(s)
}
@@ -304,6 +305,9 @@ func TestCompleteMultipartUpload(s *S3Conf) {
CompleteMultipartUpload_invalid_part_number(s)
CompleteMultipartUpload_invalid_ETag(s)
CompleteMultipartUpload_success(s)
if !s.azureTests {
CompleteMultipartUpload_racey_success(s)
}
}
func TestPutBucketAcl(s *S3Conf) {
@@ -571,17 +575,19 @@ func TestVersioning(s *S3Conf) {
Versioning_Enable_object_lock(s)
Versioning_status_switch_to_suspended_with_object_lock(s)
// Object-Lock Retention
Versionsin_PutObjectRetention_invalid_versionId(s)
Versioning_PutObjectRetention_invalid_versionId(s)
Versioning_GetObjectRetention_invalid_versionId(s)
Versioning_Put_GetObjectRetention_success(s)
// Object-Lock Legal hold
Versionsin_PutObjectLegalHold_invalid_versionId(s)
Versioning_PutObjectLegalHold_invalid_versionId(s)
Versioning_GetObjectLegalHold_invalid_versionId(s)
Versioning_Put_GetObjectLegalHold_success(s)
// WORM protection
Versioning_WORM_obj_version_locked_with_legal_hold(s)
Versioning_WORM_obj_version_locked_with_governance_retention(s)
Versioning_WORM_obj_version_locked_with_compliance_retention(s)
// Concurrent requests
//Versioninig_concurrent_upload_object(s)
}
func TestVersioningDisabled(s *S3Conf) {
@@ -677,6 +683,7 @@ func GetIntTests() IntTests {
"PutObject_special_chars": PutObject_special_chars,
"PutObject_invalid_long_tags": PutObject_invalid_long_tags,
"PutObject_success": PutObject_success,
"PutObject_racey_success": PutObject_racey_success,
"HeadObject_non_existing_object": HeadObject_non_existing_object,
"HeadObject_invalid_part_number": HeadObject_invalid_part_number,
"HeadObject_non_existing_mp": HeadObject_non_existing_mp,
@@ -790,6 +797,7 @@ func GetIntTests() IntTests {
"CompleteMultipartUpload_invalid_part_number": CompleteMultipartUpload_invalid_part_number,
"CompleteMultipartUpload_invalid_ETag": CompleteMultipartUpload_invalid_ETag,
"CompleteMultipartUpload_success": CompleteMultipartUpload_success,
"CompleteMultipartUpload_racey_success": CompleteMultipartUpload_racey_success,
"PutBucketAcl_non_existing_bucket": PutBucketAcl_non_existing_bucket,
"PutBucketAcl_disabled": PutBucketAcl_disabled,
"PutBucketAcl_none_of_the_options_specified": PutBucketAcl_none_of_the_options_specified,
@@ -939,14 +947,15 @@ func GetIntTests() IntTests {
"Versioning_UploadPartCopy_from_an_object_version": Versioning_UploadPartCopy_from_an_object_version,
"Versioning_Enable_object_lock": Versioning_Enable_object_lock,
"Versioning_status_switch_to_suspended_with_object_lock": Versioning_status_switch_to_suspended_with_object_lock,
"Versionsin_PutObjectRetention_invalid_versionId": Versionsin_PutObjectRetention_invalid_versionId,
"Versioning_PutObjectRetention_invalid_versionId": Versioning_PutObjectRetention_invalid_versionId,
"Versioning_GetObjectRetention_invalid_versionId": Versioning_GetObjectRetention_invalid_versionId,
"Versioning_Put_GetObjectRetention_success": Versioning_Put_GetObjectRetention_success,
"Versionsin_PutObjectLegalHold_invalid_versionId": Versionsin_PutObjectLegalHold_invalid_versionId,
"Versioning_PutObjectLegalHold_invalid_versionId": Versioning_PutObjectLegalHold_invalid_versionId,
"Versioning_GetObjectLegalHold_invalid_versionId": Versioning_GetObjectLegalHold_invalid_versionId,
"Versioning_Put_GetObjectLegalHold_success": Versioning_Put_GetObjectLegalHold_success,
"Versioning_WORM_obj_version_locked_with_legal_hold": Versioning_WORM_obj_version_locked_with_legal_hold,
"Versioning_WORM_obj_version_locked_with_governance_retention": Versioning_WORM_obj_version_locked_with_governance_retention,
"Versioning_WORM_obj_version_locked_with_compliance_retention": Versioning_WORM_obj_version_locked_with_compliance_retention,
"Versioning_concurrent_upload_object": Versioning_concurrent_upload_object,
}
}

View File

@@ -115,6 +115,7 @@ func (c *S3Conf) Config() aws.Config {
config.WithRegion(c.awsRegion),
config.WithCredentialsProvider(creds),
config.WithHTTPClient(client),
config.WithRetryMaxAttempts(1),
}
if c.checksumDisable {

View File

@@ -19,6 +19,7 @@ import (
"context"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"encoding/xml"
"errors"
"fmt"
@@ -27,6 +28,7 @@ import (
"net/url"
"regexp"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/service/s3"
@@ -34,6 +36,7 @@ import (
"github.com/versity/versitygw/backend"
"github.com/versity/versitygw/s3err"
"github.com/versity/versitygw/s3response"
"golang.org/x/sync/errgroup"
)
var (
@@ -2865,6 +2868,52 @@ func PutObject_with_object_lock(s *S3Conf) error {
return nil
}
func PutObject_racey_success(s *S3Conf) error {
testName := "PutObject_racey_success"
runF(testName)
bucket, obj, lockStatus := getBucketName(), "my-obj", true
client := s3.NewFromConfig(s.Config())
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err := client.CreateBucket(ctx, &s3.CreateBucketInput{
Bucket: &bucket,
ObjectLockEnabledForBucket: &lockStatus,
})
cancel()
if err != nil {
failF("%v: %v", testName, err)
return fmt.Errorf("%v: %w", testName, err)
}
eg := errgroup.Group{}
for i := 0; i < 10; i++ {
eg.Go(func() error {
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err := client.PutObject(ctx, &s3.PutObjectInput{
Bucket: &bucket,
Key: &obj,
})
cancel()
return err
})
}
err = eg.Wait()
if err != nil {
failF("%v: %v", testName, err)
return fmt.Errorf("%v: %w", testName, err)
}
err = teardown(s, bucket)
if err != nil {
failF("%v: %v", err)
return fmt.Errorf("%v: %w", testName, err)
}
passF(testName)
return nil
}
func PutObject_success(s *S3Conf) error {
testName := "PutObject_success"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
@@ -2943,7 +2992,7 @@ func HeadObject_mp_success(s *S3Conf) error {
testName := "HeadObject_mp_success"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "my-obj"
partCount, partSize := 5, 1024
partCount, partSize := int64(5), int64(1024)
partNumber := int32(3)
mp, err := createMp(s3client, bucket, obj)
@@ -2951,7 +3000,7 @@ func HeadObject_mp_success(s *S3Conf) error {
return err
}
parts, err := uploadParts(s3client, partCount*partSize, partCount, bucket, obj, *mp.UploadId)
parts, _, err := uploadParts(s3client, partCount*partSize, partCount, bucket, obj, *mp.UploadId)
if err != nil {
return err
}
@@ -5245,7 +5294,7 @@ func CreateMultipartUpload_with_metadata(s *S3Conf) error {
return err
}
parts, err := uploadParts(s3client, 100, 1, bucket, obj, *out.UploadId)
parts, _, err := uploadParts(s3client, 100, 1, bucket, obj, *out.UploadId)
if err != nil {
return err
}
@@ -5319,7 +5368,7 @@ func CreateMultipartUpload_with_content_type(s *S3Conf) error {
return err
}
parts, err := uploadParts(s3client, 100, 1, bucket, obj, *out.UploadId)
parts, _, err := uploadParts(s3client, 100, 1, bucket, obj, *out.UploadId)
if err != nil {
return err
}
@@ -5382,7 +5431,7 @@ func CreateMultipartUpload_with_object_lock(s *S3Conf) error {
return err
}
parts, err := uploadParts(s3client, 100, 1, bucket, obj, *out.UploadId)
parts, _, err := uploadParts(s3client, 100, 1, bucket, obj, *out.UploadId)
if err != nil {
return err
}
@@ -5540,7 +5589,7 @@ func CreateMultipartUpload_with_tagging(s *S3Conf) error {
return err
}
parts, err := uploadParts(s3client, 100, 1, bucket, obj, *out.UploadId)
parts, _, err := uploadParts(s3client, 100, 1, bucket, obj, *out.UploadId)
if err != nil {
return err
}
@@ -6228,7 +6277,7 @@ func ListParts_truncated(s *S3Conf) error {
return err
}
parts, err := uploadParts(s3client, 5*1024*1024, 5, bucket, obj, *out.UploadId)
parts, _, err := uploadParts(s3client, 5*1024*1024, 5, bucket, obj, *out.UploadId)
if err != nil {
return err
}
@@ -6292,7 +6341,7 @@ func ListParts_success(s *S3Conf) error {
return err
}
parts, err := uploadParts(s3client, 5*1024*1024, 5, bucket, obj, *out.UploadId)
parts, _, err := uploadParts(s3client, 5*1024*1024, 5, bucket, obj, *out.UploadId)
if err != nil {
return err
}
@@ -6781,8 +6830,8 @@ func CompleteMultipartUpload_success(s *S3Conf) error {
return err
}
objSize := 5 * 1024 * 1024
parts, err := uploadParts(s3client, objSize, 5, bucket, obj, *out.UploadId)
objSize := int64(5 * 1024 * 1024)
parts, csum, err := uploadParts(s3client, objSize, 5, bucket, obj, *out.UploadId)
if err != nil {
return err
}
@@ -6830,10 +6879,162 @@ func CompleteMultipartUpload_success(s *S3Conf) error {
return fmt.Errorf("expected the uploaded object size to be %v, instead got %v", objSize, resp.ContentLength)
}
ctx, cancel = context.WithTimeout(context.Background(), shortTimeout)
defer cancel()
rget, err := s3client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &obj,
})
if err != nil {
return err
}
if *rget.ContentLength != int64(objSize) {
return fmt.Errorf("expected the object content-length to be %v, instead got %v", objSize, *rget.ContentLength)
}
bdy, err := io.ReadAll(rget.Body)
if err != nil {
return err
}
defer rget.Body.Close()
sum := sha256.Sum256(bdy)
getsum := hex.EncodeToString(sum[:])
if csum != getsum {
return fmt.Errorf("expected the object checksum to be %v, instead got %v", csum, getsum)
}
return nil
})
}
type mpinfo struct {
uploadId *string
parts []types.CompletedPart
}
func CompleteMultipartUpload_racey_success(s *S3Conf) error {
testName := "CompleteMultipartUpload_racey_success"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "my-obj"
var mu sync.RWMutex
uploads := make([]mpinfo, 10)
sums := make([]string, 10)
objSize := int64(5 * 1024 * 1024)
eg := errgroup.Group{}
for i := 0; i < 10; i++ {
func(i int) {
eg.Go(func() error {
out, err := createMp(s3client, bucket, obj)
if err != nil {
return err
}
parts, csum, err := uploadParts(s3client, objSize, 5, bucket, obj, *out.UploadId)
mu.Lock()
sums[i] = csum
mu.Unlock()
if err != nil {
return err
}
compParts := []types.CompletedPart{}
for _, el := range parts {
compParts = append(compParts, types.CompletedPart{
ETag: el.ETag,
PartNumber: el.PartNumber,
})
}
mu.Lock()
uploads[i] = mpinfo{
uploadId: out.UploadId,
parts: compParts,
}
mu.Unlock()
return nil
})
}(i)
}
err := eg.Wait()
if err != nil {
return err
}
eg = errgroup.Group{}
for i := 0; i < 10; i++ {
func(i int) {
eg.Go(func() error {
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
mu.RLock()
res, err := s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: &bucket,
Key: &obj,
UploadId: uploads[i].uploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: uploads[i].parts,
},
})
mu.RUnlock()
cancel()
if err != nil {
fmt.Println("GOT ERROR: ", err)
return err
}
if *res.Key != obj {
return fmt.Errorf("expected object key to be %v, instead got %v", obj, *res.Key)
}
return nil
})
}(i)
}
err = eg.Wait()
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
defer cancel()
out, err := s3client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &obj,
})
if err != nil {
return err
}
if *out.ContentLength != int64(objSize) {
return fmt.Errorf("expected the object content-length to be %v, instead got %v", objSize, *out.ContentLength)
}
bdy, err := io.ReadAll(out.Body)
if err != nil {
return err
}
defer out.Body.Close()
sum := sha256.Sum256(bdy)
csum := hex.EncodeToString(sum[:])
mu.RLock()
defer mu.RUnlock()
for _, s := range sums {
if csum == s {
return nil
}
}
return fmt.Errorf("expected the object checksum to be one of %v, instead got %v", sums, csum)
})
}
func PutBucketAcl_non_existing_bucket(s *S3Conf) error {
testName := "PutBucketAcl_non_existing_bucket"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
@@ -11594,8 +11795,8 @@ func Versioning_Multipart_Upload_success(s *S3Conf) error {
return err
}
objSize := 5 * 1024 * 1024
parts, err := uploadParts(s3client, objSize, 5, bucket, obj, *out.UploadId)
objSize := int64(5 * 1024 * 1024)
parts, _, err := uploadParts(s3client, objSize, 5, bucket, obj, *out.UploadId)
if err != nil {
return err
}
@@ -11674,8 +11875,8 @@ func Versioning_Multipart_Upload_overwrite_an_object(s *S3Conf) error {
return err
}
objSize := 5 * 1024 * 1024
parts, err := uploadParts(s3client, objSize, 5, bucket, obj, *out.UploadId)
objSize := int64(5 * 1024 * 1024)
parts, _, err := uploadParts(s3client, objSize, 5, bucket, obj, *out.UploadId)
if err != nil {
return err
}
@@ -11900,8 +12101,8 @@ func Versioning_status_switch_to_suspended_with_object_lock(s *S3Conf) error {
}, withLock())
}
func Versionsin_PutObjectRetention_invalid_versionId(s *S3Conf) error {
testName := "Versionsin_PutObjectRetention_invalid_versionId"
func Versioning_PutObjectRetention_invalid_versionId(s *S3Conf) error {
testName := "Versioning_PutObjectRetention_invalid_versionId"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "my-obj"
_, err := createObjVersions(s3client, bucket, obj, 3)
@@ -12002,8 +12203,8 @@ func Versioning_Put_GetObjectRetention_success(s *S3Conf) error {
}, withLock(), withVersioning())
}
func Versionsin_PutObjectLegalHold_invalid_versionId(s *S3Conf) error {
testName := "Versionsin_PutObjectLegalHold_invalid_versionId"
func Versioning_PutObjectLegalHold_invalid_versionId(s *S3Conf) error {
testName := "Versioning_PutObjectLegalHold_invalid_versionId"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "my-obj"
_, err := createObjVersions(s3client, bucket, obj, 3)
@@ -12251,6 +12452,7 @@ func VersioningDisabled_GetBucketVersioning_not_configured(s *S3Conf) error {
return nil
})
}
func VersioningDisabled_PutBucketVersioning_not_configured(s *S3Conf) error {
testName := "VersioningDisabled_PutBucketVersioning_not_configured"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
@@ -12266,3 +12468,65 @@ func VersioningDisabled_PutBucketVersioning_not_configured(s *S3Conf) error {
return nil
})
}
func Versioning_concurrent_upload_object(s *S3Conf) error {
testName := "Versioninig_concurrent_upload_object"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "my-obj"
versionCount := 5
// Channel to collect errors
errCh := make(chan error, versionCount)
uploadVersion := func(wg *sync.WaitGroup) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
res, err := s3client.PutObject(ctx, &s3.PutObjectInput{
Bucket: &bucket,
Key: &obj,
})
cancel()
if err != nil {
// Send error to the channel
errCh <- err
return
}
fmt.Printf("uploaded object successfully: versionId: %v\n", *res.VersionId)
}
wg := &sync.WaitGroup{}
wg.Add(versionCount)
for i := 0; i < versionCount; i++ {
go uploadVersion(wg)
}
wg.Wait()
close(errCh)
// Check if there were any errors
for err := range errCh {
if err != nil {
fmt.Printf("error uploading an object: %v\n", err.Error())
return err
}
}
// List object versions after all uploads
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
res, err := s3client.ListObjectVersions(ctx, &s3.ListObjectVersionsInput{
Bucket: &bucket,
})
cancel()
if err != nil {
return err
}
if len(res.Versions) != versionCount {
return fmt.Errorf("expected %v object versions, instead got %v", versionCount, len(res.Versions))
}
return nil
}, withVersioning())
}

View File

@@ -28,7 +28,6 @@ import (
rnd "math/rand"
"net/http"
"net/url"
"os"
"os/exec"
"sort"
"strings"
@@ -707,39 +706,22 @@ func compareDelObjects(list1, list2 []types.DeletedObject) bool {
return true
}
func uploadParts(client *s3.Client, size, partCount int, bucket, key, uploadId string) (parts []types.Part, err error) {
dr := NewDataReader(size, size)
datafile := "rand.data"
w, err := os.Create(datafile)
if err != nil {
return parts, err
}
defer w.Close()
func uploadParts(client *s3.Client, size, partCount int64, bucket, key, uploadId string) (parts []types.Part, csum string, err error) {
partSize := size / partCount
_, err = io.Copy(w, dr)
if err != nil {
return parts, err
}
hash := sha256.New()
fileInfo, err := w.Stat()
if err != nil {
return parts, err
}
partSize := fileInfo.Size() / int64(partCount)
var offset int64
for partNumber := int64(1); partNumber <= int64(partCount); partNumber++ {
for partNumber := int64(1); partNumber <= partCount; partNumber++ {
partStart := (partNumber - 1) * partSize
partEnd := partStart + partSize - 1
if partEnd > fileInfo.Size()-1 {
partEnd = fileInfo.Size() - 1
if partEnd > size-1 {
partEnd = size - 1
}
partBuffer := make([]byte, partEnd-partStart+1)
_, err := w.ReadAt(partBuffer, partStart)
if err != nil {
return parts, err
}
rand.Read(partBuffer)
hash.Write(partBuffer)
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
pn := int32(partNumber)
out, err := client.UploadPart(ctx, &s3.UploadPartInput{
@@ -751,17 +733,20 @@ func uploadParts(client *s3.Client, size, partCount int, bucket, key, uploadId s
})
cancel()
if err != nil {
return parts, err
return parts, "", err
}
parts = append(parts, types.Part{
ETag: out.ETag,
PartNumber: &pn,
Size: &partSize,
})
offset += partSize
}
return parts, err
sum := hash.Sum(nil)
csum = hex.EncodeToString(sum[:])
return parts, csum, err
}
type user struct {