fix: xml response field names for complete multipart upload

The xml encoding for the s3.CompleteMultipartUploadOutput response
type was not producing exactly the right field names for the
expected complete multipart upload result.

This change follows the pattern we have had to do for other xml
responses to create our own type that will encode better to the
expected response.

This will change the backend.Backend interface, so plugins and
other backends will have to make the corresponding changes.
This commit is contained in:
Ben McClelland
2025-04-30 14:14:19 -07:00
parent 4eba4e031c
commit 9244e9100d
9 changed files with 174 additions and 134 deletions

View File

@@ -1350,42 +1350,44 @@ func (az *Azure) AbortMultipartUpload(ctx context.Context, input *s3.AbortMultip
// Copeies the multipart metadata from .sgwtmp namespace into the newly created blob
// Deletes the multipart upload 'blob' from .sgwtmp namespace
// It indicates the end of the multipart upload
func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
var res s3response.CompleteMultipartUploadResult
tmpPath := createMetaTmpPath(*input.Key, *input.UploadId)
blobClient, err := az.getBlobClient(*input.Bucket, tmpPath)
if err != nil {
return nil, err
return res, "", err
}
props, err := blobClient.GetProperties(ctx, nil)
if err != nil {
return nil, parseMpError(err)
return res, "", parseMpError(err)
}
tags, err := blobClient.GetTags(ctx, nil)
if err != nil {
return nil, parseMpError(err)
return res, "", parseMpError(err)
}
client, err := az.getBlockBlobClient(*input.Bucket, *input.Key)
if err != nil {
return nil, err
return res, "", err
}
blockIds := []string{}
blockList, err := client.GetBlockList(ctx, blockblob.BlockListTypeUncommitted, nil)
if err != nil {
return nil, azureErrToS3Err(err)
return res, "", azureErrToS3Err(err)
}
if len(blockList.UncommittedBlocks) != len(input.MultipartUpload.Parts) {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
uncommittedBlocks := map[int32]*blockblob.Block{}
for _, el := range blockList.UncommittedBlocks {
ptNumber, err := decodeBlockId(backend.GetStringFromPtr(el.Name))
if err != nil {
return nil, fmt.Errorf("invalid block name: %w", err)
return res, "", fmt.Errorf("invalid block name: %w", err)
}
uncommittedBlocks[int32(ptNumber)] = el
@@ -1397,35 +1399,35 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
last := len(blockList.UncommittedBlocks) - 1
for i, part := range input.MultipartUpload.Parts {
if part.PartNumber == nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
if *part.PartNumber < 1 {
return nil, s3err.GetAPIError(s3err.ErrInvalidCompleteMpPartNumber)
return res, "", s3err.GetAPIError(s3err.ErrInvalidCompleteMpPartNumber)
}
if *part.PartNumber <= partNumber {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartOrder)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPartOrder)
}
partNumber = *part.PartNumber
block, ok := uncommittedBlocks[*part.PartNumber]
if !ok {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
if *part.ETag != *block.Name {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
// all parts except the last need to be greater, than
// the minimum allowed size (5 Mib)
if i < last && *block.Size < backend.MinPartSize {
return nil, s3err.GetAPIError(s3err.ErrEntityTooSmall)
return res, "", s3err.GetAPIError(s3err.ErrEntityTooSmall)
}
totalSize += *block.Size
blockIds = append(blockIds, *block.Name)
}
if input.MpuObjectSize != nil && totalSize != *input.MpuObjectSize {
return nil, s3err.GetIncorrectMpObjectSizeErr(totalSize, *input.MpuObjectSize)
return res, "", s3err.GetIncorrectMpObjectSizeErr(totalSize, *input.MpuObjectSize)
}
opts := &blockblob.CommitBlockListOptions{
@@ -1442,20 +1444,20 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
resp, err := client.CommitBlockList(ctx, blockIds, opts)
if err != nil {
return nil, parseMpError(err)
return res, "", parseMpError(err)
}
// cleanup the multipart upload
_, err = blobClient.Delete(ctx, nil)
if err != nil {
return nil, parseMpError(err)
return res, "", parseMpError(err)
}
return &s3.CompleteMultipartUploadOutput{
return s3response.CompleteMultipartUploadResult{
Bucket: input.Bucket,
Key: input.Key,
ETag: (*string)(resp.ETag),
}, nil
}, "", nil
}
func (az *Azure) PutBucketAcl(ctx context.Context, bucket string, data []byte) error {

View File

@@ -52,7 +52,7 @@ type Backend interface {
// multipart operations
CreateMultipartUpload(context.Context, s3response.CreateMultipartUploadInput) (s3response.InitiateMultipartUploadResult, error)
CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error)
CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput) (_ s3response.CompleteMultipartUploadResult, versionid string, _ error)
AbortMultipartUpload(context.Context, *s3.AbortMultipartUploadInput) error
ListMultipartUploads(context.Context, *s3.ListMultipartUploadsInput) (s3response.ListMultipartUploadsResult, error)
ListParts(context.Context, *s3.ListPartsInput) (s3response.ListPartsResult, error)
@@ -166,8 +166,8 @@ func (BackendUnsupported) DeleteBucketCors(_ context.Context, bucket string) err
func (BackendUnsupported) CreateMultipartUpload(context.Context, s3response.CreateMultipartUploadInput) (s3response.InitiateMultipartUploadResult, error) {
return s3response.InitiateMultipartUploadResult{}, s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (BackendUnsupported) CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
func (BackendUnsupported) CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
return s3response.CompleteMultipartUploadResult{}, "", s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (BackendUnsupported) AbortMultipartUpload(context.Context, *s3.AbortMultipartUploadInput) error {
return s3err.GetAPIError(s3err.ErrNotImplemented)

View File

@@ -1374,23 +1374,25 @@ func getPartChecksum(algo types.ChecksumAlgorithm, part types.CompletedPart) str
}
}
func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
acct, ok := ctx.Value("account").(auth.Account)
if !ok {
acct = auth.Account{}
}
var res s3response.CompleteMultipartUploadResult
if input.Bucket == nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidBucketName)
return res, "", s3err.GetAPIError(s3err.ErrInvalidBucketName)
}
if input.Key == nil {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
return res, "", s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if input.UploadId == nil {
return nil, s3err.GetAPIError(s3err.ErrNoSuchUpload)
return res, "", s3err.GetAPIError(s3err.ErrNoSuchUpload)
}
if input.MultipartUpload == nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidRequest)
return res, "", s3err.GetAPIError(s3err.ErrInvalidRequest)
}
bucket := *input.Bucket
@@ -1400,22 +1402,22 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
_, err := os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket)
return res, "", s3err.GetAPIError(s3err.ErrNoSuchBucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
return res, "", fmt.Errorf("stat bucket: %w", err)
}
sum, err := p.checkUploadIDExists(bucket, object, uploadID)
if err != nil {
return nil, err
return res, "", err
}
objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum))
checksums, err := p.retrieveChecksums(nil, bucket, filepath.Join(objdir, uploadID))
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get mp checksums: %w", err)
return res, "", fmt.Errorf("get mp checksums: %w", err)
}
var checksumAlgorithm types.ChecksumAlgorithm
if checksums.Algorithm != "" {
@@ -1429,7 +1431,7 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
checksumType = types.ChecksumType("null")
}
return nil, s3err.GetChecksumTypeMismatchOnMpErr(checksumType)
return res, "", s3err.GetChecksumTypeMismatchOnMpErr(checksumType)
}
// check all parts ok
@@ -1440,13 +1442,13 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
var partNumber int32
for i, part := range parts {
if part.PartNumber == nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
if *part.PartNumber < 1 {
return nil, s3err.GetAPIError(s3err.ErrInvalidCompleteMpPartNumber)
return res, "", s3err.GetAPIError(s3err.ErrInvalidCompleteMpPartNumber)
}
if *part.PartNumber <= partNumber {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartOrder)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPartOrder)
}
partNumber = *part.PartNumber
@@ -1455,14 +1457,14 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
fullPartPath := filepath.Join(bucket, partObjPath)
fi, err := os.Lstat(fullPartPath)
if err != nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
totalsize += fi.Size()
// all parts except the last need to be greater, thena
// the minimum allowed size (5 Mib)
if i < last && fi.Size() < backend.MinPartSize {
return nil, s3err.GetAPIError(s3err.ErrEntityTooSmall)
return res, "", s3err.GetAPIError(s3err.ErrEntityTooSmall)
}
b, err := p.meta.RetrieveAttribute(nil, bucket, partObjPath, etagkey)
@@ -1471,23 +1473,23 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
etag = ""
}
if parts[i].ETag == nil || !areEtagsSame(etag, *parts[i].ETag) {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
partChecksum, err := p.retrieveChecksums(nil, bucket, partObjPath)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get part checksum: %w", err)
return res, "", fmt.Errorf("get part checksum: %w", err)
}
// If checksum has been provided on mp initalization
err = validatePartChecksum(partChecksum, part)
if err != nil {
return nil, err
return res, "", err
}
}
if input.MpuObjectSize != nil && totalsize != *input.MpuObjectSize {
return nil, s3err.GetIncorrectMpObjectSizeErr(totalsize, *input.MpuObjectSize)
return res, "", s3err.GetIncorrectMpObjectSizeErr(totalsize, *input.MpuObjectSize)
}
var hashRdr *utils.HashReader
@@ -1496,12 +1498,12 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
case types.ChecksumTypeFullObject:
hashRdr, err = utils.NewHashReader(nil, "", utils.HashType(strings.ToLower(string(checksumAlgorithm))))
if err != nil {
return nil, fmt.Errorf("initialize hash reader: %w", err)
return res, "", fmt.Errorf("initialize hash reader: %w", err)
}
case types.ChecksumTypeComposite:
compositeChecksumRdr, err = utils.NewCompositeChecksumReader(utils.HashType(strings.ToLower(string(checksumAlgorithm))))
if err != nil {
return nil, fmt.Errorf("initialize composite checksum reader: %w", err)
return res, "", fmt.Errorf("initialize composite checksum reader: %w", err)
}
}
@@ -1509,9 +1511,9 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
totalsize, acct, skipFalloc, p.forceNoTmpFile)
if err != nil {
if errors.Is(err, syscall.EDQUOT) {
return nil, s3err.GetAPIError(s3err.ErrQuotaExceeded)
return res, "", s3err.GetAPIError(s3err.ErrQuotaExceeded)
}
return nil, fmt.Errorf("open temp file: %w", err)
return res, "", fmt.Errorf("open temp file: %w", err)
}
defer f.cleanup()
@@ -1520,7 +1522,7 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
fullPartPath := filepath.Join(bucket, partObjPath)
pf, err := os.Open(fullPartPath)
if err != nil {
return nil, fmt.Errorf("open part %v: %v", *part.PartNumber, err)
return res, "", fmt.Errorf("open part %v: %v", *part.PartNumber, err)
}
var rdr io.Reader = pf
@@ -1530,7 +1532,7 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
} else if checksums.Type == types.ChecksumTypeComposite {
err := compositeChecksumRdr.Process(getPartChecksum(checksumAlgorithm, part))
if err != nil {
return nil, fmt.Errorf("process %v part checksum: %w", *part.PartNumber, err)
return res, "", fmt.Errorf("process %v part checksum: %w", *part.PartNumber, err)
}
}
@@ -1538,9 +1540,9 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
pf.Close()
if err != nil {
if errors.Is(err, syscall.EDQUOT) {
return nil, s3err.GetAPIError(s3err.ErrQuotaExceeded)
return res, "", s3err.GetAPIError(s3err.ErrQuotaExceeded)
}
return nil, fmt.Errorf("copy part %v: %v", part.PartNumber, err)
return res, "", fmt.Errorf("copy part %v: %v", part.PartNumber, err)
}
}
@@ -1550,7 +1552,7 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
objMeta := p.loadObjectMetaData(bucket, upiddir, nil, userMetaData)
err = p.storeObjectMetadata(f.File(), bucket, object, objMeta)
if err != nil {
return nil, err
return res, "", err
}
objname := filepath.Join(bucket, object)
@@ -1559,13 +1561,13 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
uid, gid, doChown := p.getChownIDs(acct)
err = backend.MkdirAll(dir, uid, gid, doChown, p.newDirPerm)
if err != nil {
return nil, err
return res, "", err
}
}
vStatus, err := p.getBucketVersioningStatus(ctx, bucket)
if err != nil {
return nil, err
return res, "", err
}
vEnabled := p.isBucketVersioningEnabled(vStatus)
@@ -1575,7 +1577,7 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
if p.versioningEnabled() && vEnabled && err == nil && !d.IsDir() {
_, err := p.createObjVersion(bucket, object, d.Size(), acct)
if err != nil {
return nil, fmt.Errorf("create object version: %w", err)
return res, "", fmt.Errorf("create object version: %w", err)
}
}
@@ -1586,38 +1588,38 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
err := p.meta.StoreAttribute(f.File(), bucket, object, versionIdKey, []byte(versionID))
if err != nil {
return nil, fmt.Errorf("set versionId attr: %w", err)
return res, "", fmt.Errorf("set versionId attr: %w", err)
}
}
for k, v := range userMetaData {
err = p.meta.StoreAttribute(f.File(), bucket, object, fmt.Sprintf("%v.%v", metaHdr, k), []byte(v))
if err != nil {
return nil, fmt.Errorf("set user attr %q: %w", k, err)
return res, "", fmt.Errorf("set user attr %q: %w", k, err)
}
}
// load and set tagging
tagging, err := p.meta.RetrieveAttribute(nil, bucket, upiddir, tagHdr)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object tagging: %w", err)
return res, "", fmt.Errorf("get object tagging: %w", err)
}
if err == nil {
err := p.meta.StoreAttribute(f.File(), bucket, object, tagHdr, tagging)
if err != nil {
return nil, fmt.Errorf("set object tagging: %w", err)
return res, "", fmt.Errorf("set object tagging: %w", err)
}
}
// load and set legal hold
lHold, err := p.meta.RetrieveAttribute(nil, bucket, upiddir, objectLegalHoldKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object legal hold: %w", err)
return res, "", fmt.Errorf("get object legal hold: %w", err)
}
if err == nil {
err := p.meta.StoreAttribute(f.File(), bucket, object, objectLegalHoldKey, lHold)
if err != nil {
return nil, fmt.Errorf("set object legal hold: %w", err)
return res, "", fmt.Errorf("set object legal hold: %w", err)
}
}
@@ -1645,50 +1647,50 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
switch checksumAlgorithm {
case types.ChecksumAlgorithmCrc32:
if input.ChecksumCRC32 != nil && *input.ChecksumCRC32 != sum {
return nil, s3err.GetChecksumBadDigestErr(checksumAlgorithm)
return res, "", s3err.GetChecksumBadDigestErr(checksumAlgorithm)
}
checksum.CRC32 = &sum
crc32 = &sum
case types.ChecksumAlgorithmCrc32c:
if input.ChecksumCRC32C != nil && *input.ChecksumCRC32C != sum {
return nil, s3err.GetChecksumBadDigestErr(checksumAlgorithm)
return res, "", s3err.GetChecksumBadDigestErr(checksumAlgorithm)
}
checksum.CRC32C = &sum
crc32c = &sum
case types.ChecksumAlgorithmSha1:
if input.ChecksumSHA1 != nil && *input.ChecksumSHA1 != sum {
return nil, s3err.GetChecksumBadDigestErr(checksumAlgorithm)
return res, "", s3err.GetChecksumBadDigestErr(checksumAlgorithm)
}
checksum.SHA1 = &sum
sha1 = &sum
case types.ChecksumAlgorithmSha256:
if input.ChecksumSHA256 != nil && *input.ChecksumSHA256 != sum {
return nil, s3err.GetChecksumBadDigestErr(checksumAlgorithm)
return res, "", s3err.GetChecksumBadDigestErr(checksumAlgorithm)
}
checksum.SHA256 = &sum
sha256 = &sum
case types.ChecksumAlgorithmCrc64nvme:
if input.ChecksumCRC64NVME != nil && *input.ChecksumCRC64NVME != sum {
return nil, s3err.GetChecksumBadDigestErr(checksumAlgorithm)
return res, "", s3err.GetChecksumBadDigestErr(checksumAlgorithm)
}
checksum.CRC64NVME = &sum
crc64nvme = &sum
}
err := p.storeChecksums(f.File(), bucket, object, checksum)
if err != nil {
return nil, fmt.Errorf("store object checksum: %w", err)
return res, "", fmt.Errorf("store object checksum: %w", err)
}
}
// load and set retention
ret, err := p.meta.RetrieveAttribute(nil, bucket, upiddir, objectRetentionKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object retention: %w", err)
return res, "", fmt.Errorf("get object retention: %w", err)
}
if err == nil {
err := p.meta.StoreAttribute(f.File(), bucket, object, objectRetentionKey, ret)
if err != nil {
return nil, fmt.Errorf("set object retention: %w", err)
return res, "", fmt.Errorf("set object retention: %w", err)
}
}
@@ -1697,12 +1699,12 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
err = p.meta.StoreAttribute(f.File(), bucket, object, etagkey, []byte(s3MD5))
if err != nil {
return nil, fmt.Errorf("set etag attr: %w", err)
return res, "", fmt.Errorf("set etag attr: %w", err)
}
err = f.link()
if err != nil {
return nil, fmt.Errorf("link object in namespace: %w", err)
return res, "", fmt.Errorf("link object in namespace: %w", err)
}
// cleanup tmp dirs
@@ -1711,18 +1713,17 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
// for same object name outstanding, this will fail if there are
os.Remove(filepath.Join(bucket, objdir))
return &s3.CompleteMultipartUploadOutput{
return s3response.CompleteMultipartUploadResult{
Bucket: &bucket,
ETag: &s3MD5,
Key: &object,
VersionId: &versionID,
ChecksumCRC32: crc32,
ChecksumCRC32C: crc32c,
ChecksumSHA1: sha1,
ChecksumSHA256: sha256,
ChecksumCRC64NVME: crc64nvme,
ChecksumType: checksums.Type,
}, nil
ChecksumType: &checksums.Type,
}, versionID, nil
}
func validatePartChecksum(checksum s3response.Checksum, part types.CompletedPart) error {

View File

@@ -413,9 +413,11 @@ func (s *S3Proxy) CreateMultipartUpload(ctx context.Context, input s3response.Cr
}, nil
}
func (s *S3Proxy) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
func (s *S3Proxy) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
var res s3response.CompleteMultipartUploadResult
if *input.Bucket == s.metaBucket {
return nil, s3err.GetAPIError(s3err.ErrAccessDenied)
return res, "", s3err.GetAPIError(s3err.ErrAccessDenied)
}
if input.ChecksumCRC32 != nil && *input.ChecksumCRC32 == "" {
input.ChecksumCRC32 = nil
@@ -454,8 +456,27 @@ func (s *S3Proxy) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
input.SSECustomerKeyMD5 = nil
}
var versionid string
out, err := s.client.CompleteMultipartUpload(ctx, input)
return out, handleError(err)
if out != nil {
res = s3response.CompleteMultipartUploadResult{
Location: out.Location,
Bucket: out.Bucket,
Key: out.Key,
ETag: out.ETag,
ChecksumCRC32: out.ChecksumCRC32,
ChecksumCRC32C: out.ChecksumCRC32C,
ChecksumCRC64NVME: out.ChecksumCRC64NVME,
ChecksumSHA1: out.ChecksumSHA1,
ChecksumSHA256: out.ChecksumSHA256,
ChecksumType: &out.ChecksumType,
}
if out.VersionId != nil {
versionid = *out.VersionId
}
}
return res, versionid, handleError(err)
}
func (s *S3Proxy) AbortMultipartUpload(ctx context.Context, input *s3.AbortMultipartUploadInput) error {

View File

@@ -193,23 +193,25 @@ func (s *ScoutFS) UploadPart(ctx context.Context, input *s3.UploadPartInput) (*s
// CompleteMultipartUpload scoutfs complete upload uses scoutfs move blocks
// ioctl to not have to read and copy the part data to the final object. This
// saves a read and write cycle for all mutlipart uploads.
func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
acct, ok := ctx.Value("account").(auth.Account)
if !ok {
acct = auth.Account{}
}
var res s3response.CompleteMultipartUploadResult
if input.Bucket == nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidBucketName)
return res, "", s3err.GetAPIError(s3err.ErrInvalidBucketName)
}
if input.Key == nil {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
return res, "", s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if input.UploadId == nil {
return nil, s3err.GetAPIError(s3err.ErrNoSuchUpload)
return res, "", s3err.GetAPIError(s3err.ErrNoSuchUpload)
}
if input.MultipartUpload == nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidRequest)
return res, "", s3err.GetAPIError(s3err.ErrInvalidRequest)
}
bucket := *input.Bucket
@@ -219,22 +221,22 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
_, err := os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket)
return res, "", s3err.GetAPIError(s3err.ErrNoSuchBucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
return res, "", fmt.Errorf("stat bucket: %w", err)
}
sum, err := s.checkUploadIDExists(bucket, object, uploadID)
if err != nil {
return nil, err
return res, "", err
}
objdir := filepath.Join(metaTmpMultipartDir, fmt.Sprintf("%x", sum))
checksums, err := s.retrieveChecksums(nil, bucket, filepath.Join(objdir, uploadID))
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get mp checksums: %w", err)
return res, "", fmt.Errorf("get mp checksums: %w", err)
}
// ChecksumType should be the same as specified on CreateMultipartUpload
@@ -244,7 +246,7 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
checksumType = types.ChecksumType("null")
}
return nil, s3err.GetChecksumTypeMismatchOnMpErr(checksumType)
return res, "", s3err.GetChecksumTypeMismatchOnMpErr(checksumType)
}
// check all parts ok
@@ -255,13 +257,13 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
var partNumber int32
for i, part := range parts {
if part.PartNumber == nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
if *part.PartNumber < 1 {
return nil, s3err.GetAPIError(s3err.ErrInvalidCompleteMpPartNumber)
return res, "", s3err.GetAPIError(s3err.ErrInvalidCompleteMpPartNumber)
}
if *part.PartNumber <= partNumber {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartOrder)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPartOrder)
}
partNumber = *part.PartNumber
@@ -270,14 +272,14 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
fullPartPath := filepath.Join(bucket, partObjPath)
fi, err := os.Lstat(fullPartPath)
if err != nil {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
totalsize += fi.Size()
// all parts except the last need to be greater, thena
// the minimum allowed size (5 Mib)
if i < last && fi.Size() < backend.MinPartSize {
return nil, s3err.GetAPIError(s3err.ErrEntityTooSmall)
return res, "", s3err.GetAPIError(s3err.ErrEntityTooSmall)
}
b, err := s.meta.RetrieveAttribute(nil, bucket, partObjPath, etagkey)
@@ -286,23 +288,23 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
etag = ""
}
if parts[i].ETag == nil || etag != *parts[i].ETag {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
partChecksum, err := s.retrieveChecksums(nil, bucket, partObjPath)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get part checksum: %w", err)
return res, "", fmt.Errorf("get part checksum: %w", err)
}
// If checksum has been provided on mp initalization
err = validatePartChecksum(partChecksum, part)
if err != nil {
return nil, err
return res, "", err
}
}
if input.MpuObjectSize != nil && totalsize != *input.MpuObjectSize {
return nil, s3err.GetIncorrectMpObjectSizeErr(totalsize, *input.MpuObjectSize)
return res, "", s3err.GetIncorrectMpObjectSizeErr(totalsize, *input.MpuObjectSize)
}
// use totalsize=0 because we wont be writing to the file, only moving
@@ -310,22 +312,22 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
f, err := s.openTmpFile(filepath.Join(bucket, metaTmpDir), bucket, object, 0, acct)
if err != nil {
if errors.Is(err, syscall.EDQUOT) {
return nil, s3err.GetAPIError(s3err.ErrQuotaExceeded)
return res, "", s3err.GetAPIError(s3err.ErrQuotaExceeded)
}
return nil, fmt.Errorf("open temp file: %w", err)
return res, "", fmt.Errorf("open temp file: %w", err)
}
defer f.cleanup()
for _, part := range parts {
if part.PartNumber == nil || *part.PartNumber < 1 {
return nil, s3err.GetAPIError(s3err.ErrInvalidPart)
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
partObjPath := filepath.Join(objdir, uploadID, fmt.Sprintf("%v", *part.PartNumber))
fullPartPath := filepath.Join(bucket, partObjPath)
pf, err := os.Open(fullPartPath)
if err != nil {
return nil, fmt.Errorf("open part %v: %v", *part.PartNumber, err)
return res, "", fmt.Errorf("open part %v: %v", *part.PartNumber, err)
}
// scoutfs move data is a metadata only operation that moves the data
@@ -334,7 +336,7 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
err = moveData(pf, f.File())
pf.Close()
if err != nil {
return nil, fmt.Errorf("move blocks part %v: %v", *part.PartNumber, err)
return res, "", fmt.Errorf("move blocks part %v: %v", *part.PartNumber, err)
}
}
@@ -343,7 +345,7 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
objMeta := s.loadUserMetaData(bucket, upiddir, userMetaData)
err = s.storeObjectMetadata(f.File(), bucket, object, objMeta)
if err != nil {
return nil, err
return res, "", err
}
objname := filepath.Join(bucket, object)
@@ -352,50 +354,50 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
uid, gid, doChown := s.getChownIDs(acct)
err = backend.MkdirAll(dir, uid, gid, doChown, s.newDirPerm)
if err != nil {
return nil, err
return res, "", err
}
}
for k, v := range userMetaData {
err = s.meta.StoreAttribute(f.File(), bucket, object, fmt.Sprintf("%v.%v", metaHdr, k), []byte(v))
if err != nil {
return nil, fmt.Errorf("set user attr %q: %w", k, err)
return res, "", fmt.Errorf("set user attr %q: %w", k, err)
}
}
// load and set tagging
tagging, err := s.meta.RetrieveAttribute(nil, bucket, upiddir, tagHdr)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object tagging: %w", err)
return res, "", fmt.Errorf("get object tagging: %w", err)
}
if err == nil {
err := s.meta.StoreAttribute(f.File(), bucket, object, tagHdr, tagging)
if err != nil {
return nil, fmt.Errorf("set object tagging: %w", err)
return res, "", fmt.Errorf("set object tagging: %w", err)
}
}
// load and set legal hold
lHold, err := s.meta.RetrieveAttribute(nil, bucket, upiddir, objectLegalHoldKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object legal hold: %w", err)
return res, "", fmt.Errorf("get object legal hold: %w", err)
}
if err == nil {
err := s.meta.StoreAttribute(f.File(), bucket, object, objectLegalHoldKey, lHold)
if err != nil {
return nil, fmt.Errorf("set object legal hold: %w", err)
return res, "", fmt.Errorf("set object legal hold: %w", err)
}
}
// load and set retention
ret, err := s.meta.RetrieveAttribute(nil, bucket, upiddir, objectRetentionKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object retention: %w", err)
return res, "", fmt.Errorf("get object retention: %w", err)
}
if err == nil {
err := s.meta.StoreAttribute(f.File(), bucket, object, objectRetentionKey, ret)
if err != nil {
return nil, fmt.Errorf("set object retention: %w", err)
return res, "", fmt.Errorf("set object retention: %w", err)
}
}
@@ -404,12 +406,12 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
err = s.meta.StoreAttribute(f.File(), bucket, object, etagkey, []byte(s3MD5))
if err != nil {
return nil, fmt.Errorf("set etag attr: %w", err)
return res, "", fmt.Errorf("set etag attr: %w", err)
}
err = f.link()
if err != nil {
return nil, fmt.Errorf("link object in namespace: %w", err)
return res, "", fmt.Errorf("link object in namespace: %w", err)
}
// cleanup tmp dirs
@@ -418,11 +420,11 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet
// for same object name outstanding
os.Remove(filepath.Join(bucket, objdir))
return &s3.CompleteMultipartUploadOutput{
return s3response.CompleteMultipartUploadResult{
Bucket: &bucket,
ETag: &s3MD5,
Key: &object,
}, nil
}, "", nil
}
func (s *ScoutFS) storeObjectMetadata(f *os.File, bucket, object string, m objectMetadata) error {

View File

@@ -29,7 +29,7 @@ var _ backend.Backend = &BackendMock{}
// ChangeBucketOwnerFunc: func(contextMoqParam context.Context, bucket string, acl []byte) error {
// panic("mock out the ChangeBucketOwner method")
// },
// CompleteMultipartUploadFunc: func(contextMoqParam context.Context, completeMultipartUploadInput *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
// CompleteMultipartUploadFunc: func(contextMoqParam context.Context, completeMultipartUploadInput *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
// panic("mock out the CompleteMultipartUpload method")
// },
// CopyObjectFunc: func(contextMoqParam context.Context, copyObjectInput s3response.CopyObjectInput) (*s3.CopyObjectOutput, error) {
@@ -199,7 +199,7 @@ type BackendMock struct {
ChangeBucketOwnerFunc func(contextMoqParam context.Context, bucket string, acl []byte) error
// CompleteMultipartUploadFunc mocks the CompleteMultipartUpload method.
CompleteMultipartUploadFunc func(contextMoqParam context.Context, completeMultipartUploadInput *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error)
CompleteMultipartUploadFunc func(contextMoqParam context.Context, completeMultipartUploadInput *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error)
// CopyObjectFunc mocks the CopyObject method.
CopyObjectFunc func(contextMoqParam context.Context, copyObjectInput s3response.CopyObjectInput) (*s3.CopyObjectOutput, error)
@@ -904,7 +904,7 @@ func (mock *BackendMock) ChangeBucketOwnerCalls() []struct {
}
// CompleteMultipartUpload calls CompleteMultipartUploadFunc.
func (mock *BackendMock) CompleteMultipartUpload(contextMoqParam context.Context, completeMultipartUploadInput *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
func (mock *BackendMock) CompleteMultipartUpload(contextMoqParam context.Context, completeMultipartUploadInput *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
if mock.CompleteMultipartUploadFunc == nil {
panic("BackendMock.CompleteMultipartUploadFunc: method is nil but Backend.CompleteMultipartUpload was just called")
}

View File

@@ -3685,7 +3685,7 @@ func (c S3ApiController) CreateActions(ctx *fiber.Ctx) error {
})
}
res, err := c.be.CompleteMultipartUpload(ctx.Context(),
res, versid, err := c.be.CompleteMultipartUpload(ctx.Context(),
&s3.CompleteMultipartUploadInput{
Bucket: &bucket,
Key: &key,
@@ -3702,11 +3702,11 @@ func (c S3ApiController) CreateActions(ctx *fiber.Ctx) error {
ChecksumType: checksumType,
})
if err == nil {
if getstring(res.VersionId) != "" {
if versid != "" {
utils.SetResponseHeaders(ctx, []utils.CustomHeader{
{
Key: "x-amz-version-id",
Value: getstring(res.VersionId),
Value: versid,
},
})
}
@@ -3719,7 +3719,7 @@ func (c S3ApiController) CreateActions(ctx *fiber.Ctx) error {
BucketOwner: parsedAcl.Owner,
ObjectETag: res.ETag,
EventName: s3event.EventCompleteMultipartUpload,
VersionId: res.VersionId,
VersionId: backend.GetPtrFromString(versid),
})
}
return SendXMLResponse(ctx, res, err,

View File

@@ -1765,8 +1765,8 @@ func TestS3ApiController_CreateActions(t *testing.T) {
RestoreObjectFunc: func(context.Context, *s3.RestoreObjectInput) error {
return nil
},
CompleteMultipartUploadFunc: func(context.Context, *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
return &s3.CompleteMultipartUploadOutput{}, nil
CompleteMultipartUploadFunc: func(context.Context, *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
return s3response.CompleteMultipartUploadResult{}, "", nil
},
CreateMultipartUploadFunc: func(context.Context, s3response.CreateMultipartUploadInput) (s3response.InitiateMultipartUploadResult, error) {
return s3response.InitiateMultipartUploadResult{}, nil

View File

@@ -351,6 +351,20 @@ type CopyObjectResult struct {
CopySourceVersionId string `xml:"-"`
}
func (r CopyObjectResult) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
type Alias CopyObjectResult
aux := &struct {
LastModified string `xml:"LastModified"`
*Alias
}{
Alias: (*Alias)(&r),
}
aux.LastModified = r.LastModified.UTC().Format(iso8601TimeFormat)
return e.EncodeElement(aux, start)
}
type CopyPartResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CopyPartResult" json:"-"`
LastModified time.Time
@@ -365,18 +379,18 @@ type CopyPartResult struct {
CopySourceVersionId string `xml:"-"`
}
func (r CopyObjectResult) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
type Alias CopyObjectResult
aux := &struct {
LastModified string `xml:"LastModified"`
*Alias
}{
Alias: (*Alias)(&r),
}
aux.LastModified = r.LastModified.UTC().Format(iso8601TimeFormat)
return e.EncodeElement(aux, start)
type CompleteMultipartUploadResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult" json:"-"`
Location *string
Bucket *string
Key *string
ETag *string
ChecksumCRC32 *string
ChecksumCRC32C *string
ChecksumSHA1 *string
ChecksumSHA256 *string
ChecksumCRC64NVME *string
ChecksumType *types.ChecksumType
}
type AccessControlPolicy struct {