Merge pull request #1252 from versity/ben/mp-complete-xml-response

fix: xml response field names for complete multipart upload
This commit is contained in:
Ben McClelland
2025-05-01 12:36:00 -07:00
committed by GitHub
9 changed files with 174 additions and 134 deletions

View File

@@ -1350,42 +1350,44 @@ func (az *Azure) AbortMultipartUpload(ctx context.Context, input *s3.AbortMultip
// Copeies the multipart metadata from .sgwtmp namespace into the newly created blob
// Deletes the multipart upload 'blob' from .sgwtmp namespace
// It indicates the end of the multipart upload
func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
var res s3response.CompleteMultipartUploadResult
tmpPath := createMetaTmpPath(*input.Key, *input.UploadId)
blobClient, err := az.getBlobClient(*input.Bucket, tmpPath)
if err != nil {
return nil, err
return res, "", err
}
props, err := blobClient.GetProperties(ctx, nil)
if err != nil {
return nil, parseMpError(err)
return res, "", parseMpError(err)
}
tags, err := blobClient.GetTags(ctx, nil)
if err != nil {
return nil, parseMpError(err)
return res, "", parseMpError(err)
}
client, err := az.getBlockBlobClient(*input.Bucket, *input.Key)
if err != nil {
return nil, err
return res, "", err
}
blockIds := []string{}
blockList, err := client.GetBlockList(ctx, blockblob.BlockListTypeUncommitted, nil)
if err != nil {
return nil, azureErrToS3Err(err)
return res, "", azureErrToS3Err(err)
}
if len(blockList.UncommittedBlocks) != len(input.MultipartUpload.Parts) {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
uncommittedBlocks := map[int32]*blockblob.Block{}
for _, el := range blockList.UncommittedBlocks {
ptNumber, err := decodeBlockId(backend.GetStringFromPtr(el.Name))
if err != nil {
return nil, fmt.Errorf("invalid block name: %w", err)
return res, "", fmt.Errorf("invalid block name: %w", err)
}
uncommittedBlocks[int32(ptNumber)] = el
@@ -1397,35 +1399,35 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
last := len(blockList.UncommittedBlocks) - 1
for i, part := range input.MultipartUpload.Parts {
if part.PartNumber == nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
if *part.PartNumber < 1 {
return nil, s3err.GetAPIError(s3err.ErrInvalidCompleteMpPartNumber)
return res, "", s3err.GetAPIError(s3err.ErrInvalidCompleteMpPartNumber)
}
if *part.PartNumber <= partNumber {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartOrder)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPartOrder)
}
partNumber = *part.PartNumber
block, ok := uncommittedBlocks[*part.PartNumber]
if !ok {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
if *part.ETag != *block.Name {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
// all parts except the last need to be greater, than
// the minimum allowed size (5 Mib)
if i < last && *block.Size < backend.MinPartSize {
return nil, s3err.GetAPIError(s3err.ErrEntityTooSmall)
return res, "", s3err.GetAPIError(s3err.ErrEntityTooSmall)
}
totalSize += *block.Size
blockIds = append(blockIds, *block.Name)
}
if input.MpuObjectSize != nil && totalSize != *input.MpuObjectSize {
return nil, s3err.GetIncorrectMpObjectSizeErr(totalSize, *input.MpuObjectSize)
return res, "", s3err.GetIncorrectMpObjectSizeErr(totalSize, *input.MpuObjectSize)
}
opts := &blockblob.CommitBlockListOptions{
@@ -1442,20 +1444,20 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
resp, err := client.CommitBlockList(ctx, blockIds, opts)
if err != nil {
return nil, parseMpError(err)
return res, "", parseMpError(err)
}
// cleanup the multipart upload
_, err = blobClient.Delete(ctx, nil)
if err != nil {
return nil, parseMpError(err)
return res, "", parseMpError(err)
}
return &s3.CompleteMultipartUploadOutput{
return s3response.CompleteMultipartUploadResult{
Bucket: input.Bucket,
Key: input.Key,
ETag: (*string)(resp.ETag),
}, nil
}, "", nil
}
func (az *Azure) PutBucketAcl(ctx context.Context, bucket string, data []byte) error {

View File

@@ -52,7 +52,7 @@ type Backend interface {
// multipart operations
CreateMultipartUpload(context.Context, s3response.CreateMultipartUploadInput) (s3response.InitiateMultipartUploadResult, error)
CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error)
CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput) (_ s3response.CompleteMultipartUploadResult, versionid string, _ error)
AbortMultipartUpload(context.Context, *s3.AbortMultipartUploadInput) error
ListMultipartUploads(context.Context, *s3.ListMultipartUploadsInput) (s3response.ListMultipartUploadsResult, error)
ListParts(context.Context, *s3.ListPartsInput) (s3response.ListPartsResult, error)
@@ -166,8 +166,8 @@ func (BackendUnsupported) DeleteBucketCors(_ context.Context, bucket string) err
func (BackendUnsupported) CreateMultipartUpload(context.Context, s3response.CreateMultipartUploadInput) (s3response.InitiateMultipartUploadResult, error) {
return s3response.InitiateMultipartUploadResult{}, s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (BackendUnsupported) CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
func (BackendUnsupported) CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
return s3response.CompleteMultipartUploadResult{}, "", s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (BackendUnsupported) AbortMultipartUpload(context.Context, *s3.AbortMultipartUploadInput) error {
return s3err.GetAPIError(s3err.ErrNotImplemented)

View File

@@ -1374,23 +1374,25 @@ func getPartChecksum(algo types.ChecksumAlgorithm, part types.CompletedPart) str
}
}
func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
acct, ok := ctx.Value("account").(auth.Account)
if !ok {
acct = auth.Account{}
}
var res s3response.CompleteMultipartUploadResult
if input.Bucket == nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidBucketName)
return res, "", s3err.GetAPIError(s3err.ErrInvalidBucketName)
}
if input.Key == nil {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
return res, "", s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if input.UploadId == nil {
return nil, s3err.GetAPIError(s3err.ErrNoSuchUpload)
return res, "", s3err.GetAPIError(s3err.ErrNoSuchUpload)
}
if input.MultipartUpload == nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidRequest)
return res, "", s3err.GetAPIError(s3err.ErrInvalidRequest)
}
bucket := *input.Bucket
@@ -1400,22 +1402,22 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
_, err := os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket)
return res, "", s3err.GetAPIError(s3err.ErrNoSuchBucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
return res, "", fmt.Errorf("stat bucket: %w", err)
}
sum, err := p.checkUploadIDExists(bucket, object, uploadID)
if err != nil {
return nil, err
return res, "", err
}
objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum))
checksums, err := p.retrieveChecksums(nil, bucket, filepath.Join(objdir, uploadID))
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get mp checksums: %w", err)
return res, "", fmt.Errorf("get mp checksums: %w", err)
}
var checksumAlgorithm types.ChecksumAlgorithm
if checksums.Algorithm != "" {
@@ -1429,7 +1431,7 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
checksumType = types.ChecksumType("null")
}
return nil, s3err.GetChecksumTypeMismatchOnMpErr(checksumType)
return res, "", s3err.GetChecksumTypeMismatchOnMpErr(checksumType)
}
// check all parts ok
@@ -1440,13 +1442,13 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
var partNumber int32
for i, part := range parts {
if part.PartNumber == nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
if *part.PartNumber < 1 {
return nil, s3err.GetAPIError(s3err.ErrInvalidCompleteMpPartNumber)
return res, "", s3err.GetAPIError(s3err.ErrInvalidCompleteMpPartNumber)
}
if *part.PartNumber <= partNumber {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartOrder)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPartOrder)
}
partNumber = *part.PartNumber
@@ -1455,14 +1457,14 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
fullPartPath := filepath.Join(bucket, partObjPath)
fi, err := os.Lstat(fullPartPath)
if err != nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
totalsize += fi.Size()
// all parts except the last need to be greater, thena
// the minimum allowed size (5 Mib)
if i < last && fi.Size() < backend.MinPartSize {
return nil, s3err.GetAPIError(s3err.ErrEntityTooSmall)
return res, "", s3err.GetAPIError(s3err.ErrEntityTooSmall)
}
b, err := p.meta.RetrieveAttribute(nil, bucket, partObjPath, etagkey)
@@ -1471,23 +1473,23 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
etag = ""
}
if parts[i].ETag == nil || !areEtagsSame(etag, *parts[i].ETag) {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
partChecksum, err := p.retrieveChecksums(nil, bucket, partObjPath)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get part checksum: %w", err)
return res, "", fmt.Errorf("get part checksum: %w", err)
}
// If checksum has been provided on mp initalization
err = validatePartChecksum(partChecksum, part)
if err != nil {
return nil, err
return res, "", err
}
}
if input.MpuObjectSize != nil && totalsize != *input.MpuObjectSize {
return nil, s3err.GetIncorrectMpObjectSizeErr(totalsize, *input.MpuObjectSize)
return res, "", s3err.GetIncorrectMpObjectSizeErr(totalsize, *input.MpuObjectSize)
}
var hashRdr *utils.HashReader
@@ -1496,12 +1498,12 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
case types.ChecksumTypeFullObject:
hashRdr, err = utils.NewHashReader(nil, "", utils.HashType(strings.ToLower(string(checksumAlgorithm))))
if err != nil {
return nil, fmt.Errorf("initialize hash reader: %w", err)
return res, "", fmt.Errorf("initialize hash reader: %w", err)
}
case types.ChecksumTypeComposite:
compositeChecksumRdr, err = utils.NewCompositeChecksumReader(utils.HashType(strings.ToLower(string(checksumAlgorithm))))
if err != nil {
return nil, fmt.Errorf("initialize composite checksum reader: %w", err)
return res, "", fmt.Errorf("initialize composite checksum reader: %w", err)
}
}
@@ -1509,9 +1511,9 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
totalsize, acct, skipFalloc, p.forceNoTmpFile)
if err != nil {
if errors.Is(err, syscall.EDQUOT) {
return nil, s3err.GetAPIError(s3err.ErrQuotaExceeded)
return res, "", s3err.GetAPIError(s3err.ErrQuotaExceeded)
}
return nil, fmt.Errorf("open temp file: %w", err)
return res, "", fmt.Errorf("open temp file: %w", err)
}
defer f.cleanup()
@@ -1520,7 +1522,7 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
fullPartPath := filepath.Join(bucket, partObjPath)
pf, err := os.Open(fullPartPath)
if err != nil {
return nil, fmt.Errorf("open part %v: %v", *part.PartNumber, err)
return res, "", fmt.Errorf("open part %v: %v", *part.PartNumber, err)
}
var rdr io.Reader = pf
@@ -1530,7 +1532,7 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
} else if checksums.Type == types.ChecksumTypeComposite {
err := compositeChecksumRdr.Process(getPartChecksum(checksumAlgorithm, part))
if err != nil {
return nil, fmt.Errorf("process %v part checksum: %w", *part.PartNumber, err)
return res, "", fmt.Errorf("process %v part checksum: %w", *part.PartNumber, err)
}
}
@@ -1538,9 +1540,9 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
pf.Close()
if err != nil {
if errors.Is(err, syscall.EDQUOT) {
return nil, s3err.GetAPIError(s3err.ErrQuotaExceeded)
return res, "", s3err.GetAPIError(s3err.ErrQuotaExceeded)
}
return nil, fmt.Errorf("copy part %v: %v", part.PartNumber, err)
return res, "", fmt.Errorf("copy part %v: %v", part.PartNumber, err)
}
}
@@ -1550,7 +1552,7 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
objMeta := p.loadObjectMetaData(bucket, upiddir, nil, userMetaData)
err = p.storeObjectMetadata(f.File(), bucket, object, objMeta)
if err != nil {
return nil, err
return res, "", err
}
objname := filepath.Join(bucket, object)
@@ -1559,13 +1561,13 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
uid, gid, doChown := p.getChownIDs(acct)
err = backend.MkdirAll(dir, uid, gid, doChown, p.newDirPerm)
if err != nil {
return nil, err
return res, "", err
}
}
vStatus, err := p.getBucketVersioningStatus(ctx, bucket)
if err != nil {
return nil, err
return res, "", err
}
vEnabled := p.isBucketVersioningEnabled(vStatus)
@@ -1575,7 +1577,7 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
if p.versioningEnabled() && vEnabled && err == nil && !d.IsDir() {
_, err := p.createObjVersion(bucket, object, d.Size(), acct)
if err != nil {
return nil, fmt.Errorf("create object version: %w", err)
return res, "", fmt.Errorf("create object version: %w", err)
}
}
@@ -1586,38 +1588,38 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
err := p.meta.StoreAttribute(f.File(), bucket, object, versionIdKey, []byte(versionID))
if err != nil {
return nil, fmt.Errorf("set versionId attr: %w", err)
return res, "", fmt.Errorf("set versionId attr: %w", err)
}
}
for k, v := range userMetaData {
err = p.meta.StoreAttribute(f.File(), bucket, object, fmt.Sprintf("%v.%v", metaHdr, k), []byte(v))
if err != nil {
return nil, fmt.Errorf("set user attr %q: %w", k, err)
return res, "", fmt.Errorf("set user attr %q: %w", k, err)
}
}
// load and set tagging
tagging, err := p.meta.RetrieveAttribute(nil, bucket, upiddir, tagHdr)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object tagging: %w", err)
return res, "", fmt.Errorf("get object tagging: %w", err)
}
if err == nil {
err := p.meta.StoreAttribute(f.File(), bucket, object, tagHdr, tagging)
if err != nil {
return nil, fmt.Errorf("set object tagging: %w", err)
return res, "", fmt.Errorf("set object tagging: %w", err)
}
}
// load and set legal hold
lHold, err := p.meta.RetrieveAttribute(nil, bucket, upiddir, objectLegalHoldKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object legal hold: %w", err)
return res, "", fmt.Errorf("get object legal hold: %w", err)
}
if err == nil {
err := p.meta.StoreAttribute(f.File(), bucket, object, objectLegalHoldKey, lHold)
if err != nil {
return nil, fmt.Errorf("set object legal hold: %w", err)
return res, "", fmt.Errorf("set object legal hold: %w", err)
}
}
@@ -1645,50 +1647,50 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
switch checksumAlgorithm {
case types.ChecksumAlgorithmCrc32:
if input.ChecksumCRC32 != nil && *input.ChecksumCRC32 != sum {
return nil, s3err.GetChecksumBadDigestErr(checksumAlgorithm)
return res, "", s3err.GetChecksumBadDigestErr(checksumAlgorithm)
}
checksum.CRC32 = &sum
crc32 = &sum
case types.ChecksumAlgorithmCrc32c:
if input.ChecksumCRC32C != nil && *input.ChecksumCRC32C != sum {
return nil, s3err.GetChecksumBadDigestErr(checksumAlgorithm)
return res, "", s3err.GetChecksumBadDigestErr(checksumAlgorithm)
}
checksum.CRC32C = &sum
crc32c = &sum
case types.ChecksumAlgorithmSha1:
if input.ChecksumSHA1 != nil && *input.ChecksumSHA1 != sum {
return nil, s3err.GetChecksumBadDigestErr(checksumAlgorithm)
return res, "", s3err.GetChecksumBadDigestErr(checksumAlgorithm)
}
checksum.SHA1 = &sum
sha1 = &sum
case types.ChecksumAlgorithmSha256:
if input.ChecksumSHA256 != nil && *input.ChecksumSHA256 != sum {
return nil, s3err.GetChecksumBadDigestErr(checksumAlgorithm)
return res, "", s3err.GetChecksumBadDigestErr(checksumAlgorithm)
}
checksum.SHA256 = &sum
sha256 = &sum
case types.ChecksumAlgorithmCrc64nvme:
if input.ChecksumCRC64NVME != nil && *input.ChecksumCRC64NVME != sum {
return nil, s3err.GetChecksumBadDigestErr(checksumAlgorithm)
return res, "", s3err.GetChecksumBadDigestErr(checksumAlgorithm)
}
checksum.CRC64NVME = &sum
crc64nvme = &sum
}
err := p.storeChecksums(f.File(), bucket, object, checksum)
if err != nil {
return nil, fmt.Errorf("store object checksum: %w", err)
return res, "", fmt.Errorf("store object checksum: %w", err)
}
}
// load and set retention
ret, err := p.meta.RetrieveAttribute(nil, bucket, upiddir, objectRetentionKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object retention: %w", err)
return res, "", fmt.Errorf("get object retention: %w", err)
}
if err == nil {
err := p.meta.StoreAttribute(f.File(), bucket, object, objectRetentionKey, ret)
if err != nil {
return nil, fmt.Errorf("set object retention: %w", err)
return res, "", fmt.Errorf("set object retention: %w", err)
}
}
@@ -1697,12 +1699,12 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
err = p.meta.StoreAttribute(f.File(), bucket, object, etagkey, []byte(s3MD5))
if err != nil {
return nil, fmt.Errorf("set etag attr: %w", err)
return res, "", fmt.Errorf("set etag attr: %w", err)
}
err = f.link()
if err != nil {
return nil, fmt.Errorf("link object in namespace: %w", err)
return res, "", fmt.Errorf("link object in namespace: %w", err)
}
// cleanup tmp dirs
@@ -1711,18 +1713,17 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
// for same object name outstanding, this will fail if there are
os.Remove(filepath.Join(bucket, objdir))
return &s3.CompleteMultipartUploadOutput{
return s3response.CompleteMultipartUploadResult{
Bucket: &bucket,
ETag: &s3MD5,
Key: &object,
VersionId: &versionID,
ChecksumCRC32: crc32,
ChecksumCRC32C: crc32c,
ChecksumSHA1: sha1,
ChecksumSHA256: sha256,
ChecksumCRC64NVME: crc64nvme,
ChecksumType: checksums.Type,
}, nil
ChecksumType: &checksums.Type,
}, versionID, nil
}
func validatePartChecksum(checksum s3response.Checksum, part types.CompletedPart) error {

View File

@@ -413,9 +413,11 @@ func (s *S3Proxy) CreateMultipartUpload(ctx context.Context, input s3response.Cr
}, nil
}
func (s *S3Proxy) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
func (s *S3Proxy) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
var res s3response.CompleteMultipartUploadResult
if *input.Bucket == s.metaBucket {
return nil, s3err.GetAPIError(s3err.ErrAccessDenied)
return res, "", s3err.GetAPIError(s3err.ErrAccessDenied)
}
if input.ChecksumCRC32 != nil && *input.ChecksumCRC32 == "" {
input.ChecksumCRC32 = nil
@@ -454,8 +456,27 @@ func (s *S3Proxy) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
input.SSECustomerKeyMD5 = nil
}
var versionid string
out, err := s.client.CompleteMultipartUpload(ctx, input)
return out, handleError(err)
if out != nil {
res = s3response.CompleteMultipartUploadResult{
Location: out.Location,
Bucket: out.Bucket,
Key: out.Key,
ETag: out.ETag,
ChecksumCRC32: out.ChecksumCRC32,
ChecksumCRC32C: out.ChecksumCRC32C,
ChecksumCRC64NVME: out.ChecksumCRC64NVME,
ChecksumSHA1: out.ChecksumSHA1,
ChecksumSHA256: out.ChecksumSHA256,
ChecksumType: &out.ChecksumType,
}
if out.VersionId != nil {
versionid = *out.VersionId
}
}
return res, versionid, handleError(err)
}
func (s *S3Proxy) AbortMultipartUpload(ctx context.Context, input *s3.AbortMultipartUploadInput) error {

View File

@@ -193,23 +193,25 @@ func (s *ScoutFS) UploadPart(ctx context.Context, input *s3.UploadPartInput) (*s
// CompleteMultipartUpload scoutfs complete upload uses scoutfs move blocks
// ioctl to not have to read and copy the part data to the final object. This
// saves a read and write cycle for all mutlipart uploads.
func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
acct, ok := ctx.Value("account").(auth.Account)
if !ok {
acct = auth.Account{}
}
var res s3response.CompleteMultipartUploadResult
if input.Bucket == nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidBucketName)
return res, "", s3err.GetAPIError(s3err.ErrInvalidBucketName)
}
if input.Key == nil {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
return res, "", s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if input.UploadId == nil {
return nil, s3err.GetAPIError(s3err.ErrNoSuchUpload)
return res, "", s3err.GetAPIError(s3err.ErrNoSuchUpload)
}
if input.MultipartUpload == nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidRequest)
return res, "", s3err.GetAPIError(s3err.ErrInvalidRequest)
}
bucket := *input.Bucket
@@ -219,22 +221,22 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
_, err := os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket)
return res, "", s3err.GetAPIError(s3err.ErrNoSuchBucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
return res, "", fmt.Errorf("stat bucket: %w", err)
}
sum, err := s.checkUploadIDExists(bucket, object, uploadID)
if err != nil {
return nil, err
return res, "", err
}
objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum))
checksums, err := s.retrieveChecksums(nil, bucket, filepath.Join(objdir, uploadID))
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get mp checksums: %w", err)
return res, "", fmt.Errorf("get mp checksums: %w", err)
}
// ChecksumType should be the same as specified on CreateMultipartUpload
@@ -244,7 +246,7 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
checksumType = types.ChecksumType("null")
}
return nil, s3err.GetChecksumTypeMismatchOnMpErr(checksumType)
return res, "", s3err.GetChecksumTypeMismatchOnMpErr(checksumType)
}
// check all parts ok
@@ -255,13 +257,13 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
var partNumber int32
for i, part := range parts {
if part.PartNumber == nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
if *part.PartNumber < 1 {
return nil, s3err.GetAPIError(s3err.ErrInvalidCompleteMpPartNumber)
return res, "", s3err.GetAPIError(s3err.ErrInvalidCompleteMpPartNumber)
}
if *part.PartNumber <= partNumber {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartOrder)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPartOrder)
}
partNumber = *part.PartNumber
@@ -270,14 +272,14 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
fullPartPath := filepath.Join(bucket, partObjPath)
fi, err := os.Lstat(fullPartPath)
if err != nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
totalsize += fi.Size()
// all parts except the last need to be greater, thena
// the minimum allowed size (5 Mib)
if i < last && fi.Size() < backend.MinPartSize {
return nil, s3err.GetAPIError(s3err.ErrEntityTooSmall)
return res, "", s3err.GetAPIError(s3err.ErrEntityTooSmall)
}
b, err := s.meta.RetrieveAttribute(nil, bucket, partObjPath, etagkey)
@@ -286,23 +288,23 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
etag = ""
}
if parts[i].ETag == nil || etag != *parts[i].ETag {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
partChecksum, err := s.retrieveChecksums(nil, bucket, partObjPath)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get part checksum: %w", err)
return res, "", fmt.Errorf("get part checksum: %w", err)
}
// If checksum has been provided on mp initalization
err = validatePartChecksum(partChecksum, part)
if err != nil {
return nil, err
return res, "", err
}
}
if input.MpuObjectSize != nil && totalsize != *input.MpuObjectSize {
return nil, s3err.GetIncorrectMpObjectSizeErr(totalsize, *input.MpuObjectSize)
return res, "", s3err.GetIncorrectMpObjectSizeErr(totalsize, *input.MpuObjectSize)
}
// use totalsize=0 because we wont be writing to the file, only moving
@@ -310,22 +312,22 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
f, err := s.openTmpFile(filepath.Join(bucket, metaTmpDir), bucket, object, 0, acct)
if err != nil {
if errors.Is(err, syscall.EDQUOT) {
return nil, s3err.GetAPIError(s3err.ErrQuotaExceeded)
return res, "", s3err.GetAPIError(s3err.ErrQuotaExceeded)
}
return nil, fmt.Errorf("open temp file: %w", err)
return res, "", fmt.Errorf("open temp file: %w", err)
}
defer f.cleanup()
for _, part := range parts {
if part.PartNumber == nil || *part.PartNumber < 1 {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
partObjPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *part.PartNumber))
fullPartPath := filepath.Join(bucket, partObjPath)
pf, err := os.Open(fullPartPath)
if err != nil {
return nil, fmt.Errorf("open part %v: %v", *part.PartNumber, err)
return res, "", fmt.Errorf("open part %v: %v", *part.PartNumber, err)
}
// scoutfs move data is a metadata only operation that moves the data
@@ -334,7 +336,7 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
err = moveData(pf, f.File())
pf.Close()
if err != nil {
return nil, fmt.Errorf("move blocks part %v: %v", *part.PartNumber, err)
return res, "", fmt.Errorf("move blocks part %v: %v", *part.PartNumber, err)
}
}
@@ -343,7 +345,7 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
objMeta := s.loadUserMetaData(bucket, upiddir, userMetaData)
err = s.storeObjectMetadata(f.File(), bucket, object, objMeta)
if err != nil {
return nil, err
return res, "", err
}
objname := filepath.Join(bucket, object)
@@ -352,50 +354,50 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
uid, gid, doChown := s.getChownIDs(acct)
err = backend.MkdirAll(dir, uid, gid, doChown, s.newDirPerm)
if err != nil {
return nil, err
return res, "", err
}
}
for k, v := range userMetaData {
err = s.meta.StoreAttribute(f.File(), bucket, object, fmt.Sprintf("%v.%v", metaHdr, k), []byte(v))
if err != nil {
return nil, fmt.Errorf("set user attr %q: %w", k, err)
return res, "", fmt.Errorf("set user attr %q: %w", k, err)
}
}
// load and set tagging
tagging, err := s.meta.RetrieveAttribute(nil, bucket, upiddir, tagHdr)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object tagging: %w", err)
return res, "", fmt.Errorf("get object tagging: %w", err)
}
if err == nil {
err := s.meta.StoreAttribute(f.File(), bucket, object, tagHdr, tagging)
if err != nil {
return nil, fmt.Errorf("set object tagging: %w", err)
return res, "", fmt.Errorf("set object tagging: %w", err)
}
}
// load and set legal hold
lHold, err := s.meta.RetrieveAttribute(nil, bucket, upiddir, objectLegalHoldKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object legal hold: %w", err)
return res, "", fmt.Errorf("get object legal hold: %w", err)
}
if err == nil {
err := s.meta.StoreAttribute(f.File(), bucket, object, objectLegalHoldKey, lHold)
if err != nil {
return nil, fmt.Errorf("set object legal hold: %w", err)
return res, "", fmt.Errorf("set object legal hold: %w", err)
}
}
// load and set retention
ret, err := s.meta.RetrieveAttribute(nil, bucket, upiddir, objectRetentionKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object retention: %w", err)
return res, "", fmt.Errorf("get object retention: %w", err)
}
if err == nil {
err := s.meta.StoreAttribute(f.File(), bucket, object, objectRetentionKey, ret)
if err != nil {
return nil, fmt.Errorf("set object retention: %w", err)
return res, "", fmt.Errorf("set object retention: %w", err)
}
}
@@ -404,12 +406,12 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
err = s.meta.StoreAttribute(f.File(), bucket, object, etagkey, []byte(s3MD5))
if err != nil {
return nil, fmt.Errorf("set etag attr: %w", err)
return res, "", fmt.Errorf("set etag attr: %w", err)
}
err = f.link()
if err != nil {
return nil, fmt.Errorf("link object in namespace: %w", err)
return res, "", fmt.Errorf("link object in namespace: %w", err)
}
// cleanup tmp dirs
@@ -418,11 +420,11 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
// for same object name outstanding
os.Remove(filepath.Join(bucket, objdir))
return &s3.CompleteMultipartUploadOutput{
return s3response.CompleteMultipartUploadResult{
Bucket: &bucket,
ETag: &s3MD5,
Key: &object,
}, nil
}, "", nil
}
func (s *ScoutFS) storeObjectMetadata(f *os.File, bucket, object string, m objectMetadata) error {

View File

@@ -29,7 +29,7 @@ var _ backend.Backend = &BackendMock{}
// ChangeBucketOwnerFunc: func(contextMoqParam context.Context, bucket string, acl []byte) error {
// panic("mock out the ChangeBucketOwner method")
// },
// CompleteMultipartUploadFunc: func(contextMoqParam context.Context, completeMultipartUploadInput *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
// CompleteMultipartUploadFunc: func(contextMoqParam context.Context, completeMultipartUploadInput *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
// panic("mock out the CompleteMultipartUpload method")
// },
// CopyObjectFunc: func(contextMoqParam context.Context, copyObjectInput s3response.CopyObjectInput) (*s3.CopyObjectOutput, error) {
@@ -199,7 +199,7 @@ type BackendMock struct {
ChangeBucketOwnerFunc func(contextMoqParam context.Context, bucket string, acl []byte) error
// CompleteMultipartUploadFunc mocks the CompleteMultipartUpload method.
CompleteMultipartUploadFunc func(contextMoqParam context.Context, completeMultipartUploadInput *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error)
CompleteMultipartUploadFunc func(contextMoqParam context.Context, completeMultipartUploadInput *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error)
// CopyObjectFunc mocks the CopyObject method.
CopyObjectFunc func(contextMoqParam context.Context, copyObjectInput s3response.CopyObjectInput) (*s3.CopyObjectOutput, error)
@@ -904,7 +904,7 @@ func (mock *BackendMock) ChangeBucketOwnerCalls() []struct {
}
// CompleteMultipartUpload calls CompleteMultipartUploadFunc.
func (mock *BackendMock) CompleteMultipartUpload(contextMoqParam context.Context, completeMultipartUploadInput *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
func (mock *BackendMock) CompleteMultipartUpload(contextMoqParam context.Context, completeMultipartUploadInput *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
if mock.CompleteMultipartUploadFunc == nil {
panic("BackendMock.CompleteMultipartUploadFunc: method is nil but Backend.CompleteMultipartUpload was just called")
}

View File

@@ -3685,7 +3685,7 @@ func (c S3ApiController) CreateActions(ctx *fiber.Ctx) error {
})
}
res, err := c.be.CompleteMultipartUpload(ctx.Context(),
res, versid, err := c.be.CompleteMultipartUpload(ctx.Context(),
&s3.CompleteMultipartUploadInput{
Bucket: &bucket,
Key: &key,
@@ -3702,11 +3702,11 @@ func (c S3ApiController) CreateActions(ctx *fiber.Ctx) error {
ChecksumType: checksumType,
})
if err == nil {
if getstring(res.VersionId) != "" {
if versid != "" {
utils.SetResponseHeaders(ctx, []utils.CustomHeader{
{
Key: "x-amz-version-id",
Value: getstring(res.VersionId),
Value: versid,
},
})
}
@@ -3719,7 +3719,7 @@ func (c S3ApiController) CreateActions(ctx *fiber.Ctx) error {
BucketOwner: parsedAcl.Owner,
ObjectETag: res.ETag,
EventName: s3event.EventCompleteMultipartUpload,
VersionId: res.VersionId,
VersionId: backend.GetPtrFromString(versid),
})
}
return SendXMLResponse(ctx, res, err,

View File

@@ -1765,8 +1765,8 @@ func TestS3ApiController_CreateActions(t *testing.T) {
RestoreObjectFunc: func(context.Context, *s3.RestoreObjectInput) error {
return nil
},
CompleteMultipartUploadFunc: func(context.Context, *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
return &s3.CompleteMultipartUploadOutput{}, nil
CompleteMultipartUploadFunc: func(context.Context, *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
return s3response.CompleteMultipartUploadResult{}, "", nil
},
CreateMultipartUploadFunc: func(context.Context, s3response.CreateMultipartUploadInput) (s3response.InitiateMultipartUploadResult, error) {
return s3response.InitiateMultipartUploadResult{}, nil

View File

@@ -351,6 +351,20 @@ type CopyObjectResult struct {
CopySourceVersionId string `xml:"-"`
}
func (r CopyObjectResult) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
type Alias CopyObjectResult
aux := &struct {
LastModified string `xml:"LastModified"`
*Alias
}{
Alias: (*Alias)(&r),
}
aux.LastModified = r.LastModified.UTC().Format(iso8601TimeFormat)
return e.EncodeElement(aux, start)
}
type CopyPartResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CopyPartResult" json:"-"`
LastModified time.Time
@@ -365,18 +379,18 @@ type CopyPartResult struct {
CopySourceVersionId string `xml:"-"`
}
func (r CopyObjectResult) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
type Alias CopyObjectResult
aux := &struct {
LastModified string `xml:"LastModified"`
*Alias
}{
Alias: (*Alias)(&r),
}
aux.LastModified = r.LastModified.UTC().Format(iso8601TimeFormat)
return e.EncodeElement(aux, start)
type CompleteMultipartUploadResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult" json:"-"`
Location *string
Bucket *string
Key *string
ETag *string
ChecksumCRC32 *string
ChecksumCRC32C *string
ChecksumSHA1 *string
ChecksumSHA256 *string
ChecksumCRC64NVME *string
ChecksumType *types.ChecksumType
}
type AccessControlPolicy struct {