From 62e8cddbc782b918f90b8eae09334206d124ac7d Mon Sep 17 00:00:00 2001 From: niksis02 Date: Thu, 16 Apr 2026 23:25:48 +0400 Subject: [PATCH] fix: make CompleteMultipartUpload idempotent and add part-number support to GetObject/HeadObject MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #1064 Use the multipart ETag as the in-progress directory suffix instead of the static `.inprogress` marker so that concurrent CompleteMultipartUpload calls for the same upload ID are all treated as successful (idempotent) rather than racing, where only one succeeded and the rest returned NoSuchUpload. After finalizing the multipart upload, store an `mp-metadata` xattr on the assembled object that records the upload ID and cumulative byte offsets for each part. GetObject and HeadObject now use this metadata to serve individual part ranges via the `partNumber` query parameter, returning a successful response instead of returning NotImplemented. Add two new S3 error codes: - `ErrInvalidPartNumberRange` (416 RequestedRangeNotSatisfiable) — returned when the requested part number exceeds the number of parts in the upload. - `ErrRangeAndPartNumber` (400 BadRequest) — returned when both a Range header and a partNumber query parameter are specified on the same request. --- backend/azure/azure.go | 161 +++++++- backend/common.go | 28 +- backend/posix/posix.go | 413 ++++++++++++------- s3api/controllers/object-get.go | 9 + s3api/controllers/object-get_test.go | 20 + s3api/controllers/object-head.go | 9 + s3api/controllers/object-head_test.go | 20 + s3err/s3err.go | 12 + tests/integration/CompleteMultipartUpload.go | 109 +++-- tests/integration/GetObject.go | 250 ++++++++++- tests/integration/HeadObject.go | 211 +++++++++- tests/integration/group-tests.go | 22 +- tests/integration/utils.go | 9 + 13 files changed, 1041 insertions(+), 232 deletions(-) diff --git a/backend/azure/azure.go b/backend/azure/azure.go index 2dcc79e9..24227e53 100644 --- a/backend/azure/azure.go +++ b/backend/azure/azure.go @@ -73,6 +73,10 @@ const ( // keyMpZeroBytesParts tracks zero-byte upload parts in the sgwtmp metadata. // Azure StageBlock rejects Content-Length: 0, so zero-byte parts are stored here. keyMpZeroBytesParts key = "Zerobytesparts" + // keyMpMetadata stores multipart upload part-offset metadata on the final + // committed blob so that GetObject/HeadObject can serve individual parts + // by part-number. + keyMpMetadata key = "Mpmetadata" defaultListingMaxKeys = 1000 ) @@ -89,6 +93,7 @@ func (key) Table() map[string]struct{} { "objectlegalhold": {}, "objname": {}, ".sgwtmp/multipart": {}, + "mpmetadata": {}, } } @@ -457,11 +462,6 @@ func (az *Azure) DeleteBucketTagging(ctx context.Context, bucket string) error { } func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.GetObjectOutput, error) { - if input.PartNumber != nil { - // querying an object with part number is not supported - return nil, s3err.GetAPIError(s3err.ErrNotImplemented) - } - client, err := az.getBlobClient(*input.Bucket, *input.Key) if err != nil { return nil, err @@ -486,8 +486,53 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.G } var opts *azblob.DownloadStreamOptions - if *input.Range != "" { - offset, count, isValid, err := backend.ParseObjectRange(*resp.ContentLength, *input.Range) + var partsCount *int32 + var contentRange *string + + if input.PartNumber != nil { + // Serve a specific part if the object has multipart upload metadata. + // For non-multipart objects (no mp-metadata), partNumber=1 returns the + // full object with no Content-Range; any other partNumber is out of range. + if mpMetaStr, ok := resp.Metadata[string(keyMpMetadata)]; ok && mpMetaStr != nil { + var mpMeta backend.MpUploadMetadata + if err := json.Unmarshal([]byte(*mpMetaStr), &mpMeta); err != nil { + return nil, fmt.Errorf("parse object multipart metadata: %w", err) + } + + partNum := *input.PartNumber + totalParts := int32(len(mpMeta.Parts)) + partsCount = &totalParts + if partNum > totalParts { + return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange) + } + + var startOffset int64 + if partNum > 1 { + startOffset = mpMeta.Parts[partNum-2] + } + length := mpMeta.Parts[partNum-1] - startOffset + var objSize int64 + if resp.ContentLength != nil { + objSize = *resp.ContentLength + } + contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %d-%d/%d", startOffset, startOffset+length-1, objSize)) + opts = &azblob.DownloadStreamOptions{ + Range: blob.HTTPRange{ + Offset: startOffset, + Count: length, + }, + } + } else if *input.PartNumber > 1 { + return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange) + } + // partNumber=1 on a non-multipart object: fall through and serve the + // full object without a range (opts remains nil, contentRange stays nil). + } else if *input.Range != "" { + var objSize int64 + if resp.ContentLength != nil { + objSize = *resp.ContentLength + } + offset, count, isValid, err := backend.ParseObjectRange(objSize, *input.Range) if err != nil { return nil, err } @@ -498,8 +543,10 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.G Offset: offset, }, } + contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %v-%v/%v", offset, offset+count-1, objSize)) } } + blobDownloadResponse, err := az.client.DownloadStream(ctx, *input.Bucket, *input.Key, opts) if err != nil { return nil, azureErrToS3Err(err) @@ -523,18 +570,14 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.G LastModified: blobDownloadResponse.LastModified, Metadata: parseAndFilterAzMetadata(blobDownloadResponse.Metadata), TagCount: &tagcount, - ContentRange: blobDownloadResponse.ContentRange, + ContentRange: contentRange, Body: blobDownloadResponse.Body, StorageClass: types.StorageClassStandard, + PartsCount: partsCount, }, nil } func (az *Azure) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) { - if input.PartNumber != nil { - // querying an object with part number is not supported - return nil, s3err.GetAPIError(s3err.ErrNotImplemented) - } - client, err := az.getBlobClient(*input.Bucket, *input.Key) if err != nil { return nil, err @@ -563,21 +606,57 @@ func (az *Azure) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3 size = *resp.ContentLength } - startOffset, length, isValid, err := backend.ParseObjectRange(size, getString(input.Range)) - if err != nil { - return nil, err - } - var contentRange string - if isValid { - contentRange = fmt.Sprintf("bytes %v-%v/%v", - startOffset, startOffset+length-1, size) + var length int64 + var partsCount *int32 + + if input.PartNumber != nil { + // Serve a specific part if the object has multipart upload metadata. + // For non-multipart objects (no mp-metadata), partNumber=1 returns the + // full object with no Content-Range; any other partNumber is out of range. + if mpMetaStr, ok := resp.Metadata[string(keyMpMetadata)]; ok && mpMetaStr != nil { + var mpMeta backend.MpUploadMetadata + if err := json.Unmarshal([]byte(*mpMetaStr), &mpMeta); err != nil { + return nil, fmt.Errorf("parse object multipart metadata: %w", err) + } + + partNum := *input.PartNumber + totalParts := int32(len(mpMeta.Parts)) + partsCount = &totalParts + if partNum > totalParts { + return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange) + } + + var startOffset int64 + if partNum > 1 { + startOffset = mpMeta.Parts[partNum-2] + } + length = mpMeta.Parts[partNum-1] - startOffset + contentRange = fmt.Sprintf("bytes %d-%d/%d", startOffset, startOffset+length-1, size) + } else if *input.PartNumber > 1 { + return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange) + } else { + // partNumber=1 on a non-multipart object: return full object size, + // no Content-Range, no PartsCount. + length = size + } + } else { + startOffset, lgth, isValid, err := backend.ParseObjectRange(size, getString(input.Range)) + if err != nil { + return nil, err + } + length = lgth + if isValid { + contentRange = fmt.Sprintf("bytes %v-%v/%v", + startOffset, startOffset+length-1, size) + } } result := &s3.HeadObjectOutput{ ContentRange: &contentRange, AcceptRanges: backend.GetPtrFromString("bytes"), ContentLength: &length, + PartsCount: partsCount, ContentType: resp.ContentType, ContentEncoding: resp.ContentEncoding, ContentLanguage: resp.ContentLanguage, @@ -1598,7 +1677,29 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete props, err := blobClient.GetProperties(ctx, nil) if err != nil { - return res, "", parseMpError(err) + mpErr := parseMpError(err) + // If the tmp blob is already gone, the upload may have already been + // completed. Check the final object's mp-metadata and return success + // if the upload IDs match. + if errors.Is(mpErr, s3err.GetAPIError(s3err.ErrNoSuchUpload)) { + finalClient, clientErr := az.getBlobClient(*input.Bucket, *input.Key) + if clientErr == nil { + finalProps, propErr := finalClient.GetProperties(ctx, nil) + if propErr == nil { + if mpMetaStr, ok := finalProps.Metadata[string(keyMpMetadata)]; ok && mpMetaStr != nil { + var mpMeta backend.MpUploadMetadata + if jsonErr := json.Unmarshal([]byte(*mpMetaStr), &mpMeta); jsonErr == nil && mpMeta.UploadID == *input.UploadId { + return s3response.CompleteMultipartUploadResult{ + Bucket: input.Bucket, + Key: input.Key, + ETag: backend.GetPtrFromString(convertAzureEtag(finalProps.ETag)), + }, "", nil + } + } + } + } + } + return res, "", mpErr } tags, err := blobClient.GetTags(ctx, nil) if err != nil { @@ -1646,6 +1747,8 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete // The initial value is the lower limit of partNumber: 0 var totalSize int64 var partNumber int32 + // partSizes[i] = cumulative byte offset after part i+1 (see backend.MpUploadMetadata) + var partSizes []int64 last := len(input.MultipartUpload.Parts) - 1 for i, part := range input.MultipartUpload.Parts { if part.PartNumber == nil { @@ -1672,6 +1775,7 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete return res, "", s3err.GetAPIError(s3err.ErrEntityTooSmall) } // Zero-byte parts contribute no data; skip adding to blockIds. + partSizes = append(partSizes, totalSize) continue } return res, "", s3err.GetAPIError(s3err.ErrInvalidPart) @@ -1686,6 +1790,7 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete return res, "", s3err.GetAPIError(s3err.ErrEntityTooSmall) } totalSize += *block.Size + partSizes = append(partSizes, totalSize) blockIds = append(blockIds, *block.Name) } @@ -1697,6 +1802,18 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete // Remove internal tracking keys from metadata before storing on the final blob. delete(props.Metadata, string(keyMpZeroBytesParts)) + // Serialize multipart metadata so GetObject/HeadObject can serve by part-number. + mpMeta := backend.MpUploadMetadata{UploadID: *input.UploadId, Parts: partSizes} + mpMetaJSON, err := json.Marshal(mpMeta) + if err != nil { + return res, "", fmt.Errorf("marshal mp metadata: %w", err) + } + mpMetaStr := string(mpMetaJSON) + if props.Metadata == nil { + props.Metadata = map[string]*string{} + } + props.Metadata[string(keyMpMetadata)] = &mpMetaStr + opts := &blockblob.CommitBlockListOptions{ Metadata: props.Metadata, Tags: parseAzTags(tags.BlobTagSet), diff --git a/backend/common.go b/backend/common.go index 83c30b93..14fbc89f 100644 --- a/backend/common.go +++ b/backend/common.go @@ -384,21 +384,21 @@ func isValidTagComponent(str string) bool { return validTagComponent.Match([]byte(str)) } -func GetMultipartMD5(parts []types.CompletedPart) string { +func GetMultipartMD5(parts []types.CompletedPart) (string, error) { var partsEtagBytes []byte for _, part := range parts { - partsEtagBytes = append(partsEtagBytes, getEtagBytes(*part.ETag)...) + bts, err := getEtagBytes(*part.ETag) + if err != nil { + return "", fmt.Errorf("decode etag: %w", err) + } + partsEtagBytes = append(partsEtagBytes, bts...) } - return fmt.Sprintf("\"%s-%d\"", md5String(partsEtagBytes), len(parts)) + return fmt.Sprintf("\"%s-%d\"", md5String(partsEtagBytes), len(parts)), nil } -func getEtagBytes(etag string) []byte { - decode, err := hex.DecodeString(strings.ReplaceAll(etag, string('"'), "")) - if err != nil { - return []byte(etag) - } - return decode +func getEtagBytes(etag string) ([]byte, error) { + return hex.DecodeString(strings.ReplaceAll(etag, string('"'), "")) } func md5String(data []byte) string { @@ -406,6 +406,16 @@ func md5String(data []byte) string { return hex.EncodeToString(sum[:]) } +// MpUploadMetadata is stored alongside the final object after a multipart +// upload completes. It records the uploadId and the cumulative byte offsets +// of each part: parts[i] = sum of sizes of parts 1..i+1. This allows O(1) +// total size lookup (parts[last]) and O(1) part-start/size derivation for +// GetObject/HeadObject part-number requests. +type MpUploadMetadata struct { + UploadID string `json:"uploadId"` + Parts []int64 `json:"parts"` +} + type FileSectionReadCloser struct { R io.Reader F *os.File diff --git a/backend/posix/posix.go b/backend/posix/posix.go index bf4c9fa6..397babb1 100644 --- a/backend/posix/posix.go +++ b/backend/posix/posix.go @@ -136,6 +136,7 @@ const ( deleteMarkerKey = "delete-marker" versionIdKey = "version-id" partCrc64nvme = "part-crc64nvme" + mpMetaKey = "mp-metadata" nullVersionId = "null" @@ -1690,25 +1691,69 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C return res, "", fmt.Errorf("stat bucket: %w", err) } - sum, err := p.checkUploadIDExists(bucket, object, uploadID) - if err != nil { - return res, "", err - } - - // Atomically rename the upload directory to .inprogress so that - // concurrent CompleteMultipartUpload calls for the same upload ID see it as - // absent and return ErrNoSuchUpload rather than racing through the rest of - // the function. If this call does not succeed we rename it back (see the - // deferred cleanup below). + // Rename the upload directory to to atomically claim + // the processing slot. A concurrent call with the same uploadId will compute + // the same ETag, so it will either find the directory still present (still + // processing) or gone (already completed) and react accordingly. + sum := sha256.Sum256([]byte(object)) objdirFull := filepath.Join(bucket, MetaTmpMultipartDir, fmt.Sprintf("%x", sum)) uploadIDDir := filepath.Join(objdirFull, uploadID) - activeUploadName := uploadID + inProgressSuffix + // Calculate s3 compatible md5sum for complete multipart. + s3MD5, err := backend.GetMultipartMD5(parts) + if err != nil { + return res, "", s3err.GetAPIError(s3err.ErrInvalidPart) + } + activeUploadName := fmt.Sprintf("%s.%s%s", uploadID, strings.Trim(s3MD5, "\""), inProgressSuffix) uploadIDInProgress := filepath.Join(objdirFull, activeUploadName) - if err := os.Rename(uploadIDDir, uploadIDInProgress); err != nil { - if errors.Is(err, fs.ErrNotExist) { - return res, "", s3err.GetAPIError(s3err.ErrNoSuchUpload) + + err = os.Rename(uploadIDDir, uploadIDInProgress) + if errors.Is(err, fs.ErrNotExist) { + // Another call already claimed this slot and is still assembling the object. + if _, statErr := os.Stat(uploadIDInProgress); statErr == nil { + // Still in progress — treat as success for idempotency. + return s3response.CompleteMultipartUploadResult{ + Bucket: &bucket, + ETag: &s3MD5, + Key: &object, + }, "", nil } - return res, "", fmt.Errorf("mark upload in-progress: %w", err) + // Directory is gone: the concurrent call already completed and cleaned up. + if _, statErr := os.Stat(filepath.Join(bucket, object)); statErr == nil { + return s3response.CompleteMultipartUploadResult{ + Bucket: &bucket, + ETag: &s3MD5, + Key: &object, + }, "", nil + } + + // Last resort: the object stat above may have lost a race with the + // concurrent call's link step. Check the mp-metadata xattr, as this + // multipart upload may have been finalized and the final object has been created + // before or by the racing request + if mpMetaBytes, statErr := p.meta.RetrieveAttribute(nil, bucket, object, mpMetaKey); statErr == nil { + var mpMeta backend.MpUploadMetadata + if err := json.Unmarshal(mpMetaBytes, &mpMeta); err != nil { + return res, "", fmt.Errorf("parse object multipart metadata: %w", err) + } + + // The object may have been overwritten by a newer upload or + // it's the result of a completely different multipart upload; only + // treat it as our completion if the upload IDs match. + if mpMeta.UploadID != uploadID { + return res, "", s3err.GetAPIError(s3err.ErrNoSuchUpload) + } + + return s3response.CompleteMultipartUploadResult{ + Bucket: &bucket, + ETag: &s3MD5, + Key: &object, + }, "", nil + } + + return res, "", s3err.GetAPIError(s3err.ErrNoSuchUpload) + } + if err != nil { + return res, "", fmt.Errorf("rename upload to etag dir: %w", err) } // Rename sidecar metadata to match the new data directory path. @@ -1761,11 +1806,23 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C checksums.Algorithm = types.ChecksumAlgorithmCrc64nvme } + // Initialize composite checksum reader + var compositeChecksumRdr *utils.CompositeChecksumReader + if checksums.Type == types.ChecksumTypeComposite { + compositeChecksumRdr, err = utils.NewCompositeChecksumReader(utils.HashType(strings.ToLower(string(checksums.Algorithm)))) + if err != nil { + return res, "", fmt.Errorf("initialize composite checksum reader: %w", err) + } + } + // check all parts ok last := len(parts) - 1 var totalsize int64 + // cumulative byte offsets: partSizes[i] = sum of sizes of parts 1..i+1 + var partSizes []int64 + var composableCsum string - // The initialie values is the lower limit of partNumber: 0 + // The initial value is the lower limit of partNumber: 0 var partNumber int32 for i, part := range parts { if part.PartNumber == nil { @@ -1788,6 +1845,7 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C } totalsize += fi.Size() + partSizes = append(partSizes, totalsize) // all parts except the last need to be greater, than or equal to // the minimum allowed size (5 Mib) if i < last && fi.Size() < backend.MinPartSize { @@ -1813,18 +1871,86 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C if err != nil { return res, "", err } + + // Accumulate checksum state. + switch checksums.Type { + case types.ChecksumTypeFullObject: + var pcs string + if mpChecksumType != "" { + pcs = getPartChecksum(checksums.Algorithm, part) + } else { + crc64nvme, err := p.meta.RetrieveAttribute(nil, bucket, partObjPath, partCrc64nvme) + if err != nil { + return res, "", fmt.Errorf("retrieve part internal crc64nvme: %w", err) + } + pcs = string(crc64nvme) + } + if i == 0 { + composableCsum = pcs + } else { + composableCsum, err = utils.AddCRCChecksum(checksums.Algorithm, composableCsum, pcs, fi.Size()) + if err != nil { + return res, "", fmt.Errorf("add part %v checksum: %w", *part.PartNumber, err) + } + } + case types.ChecksumTypeComposite: + if err := compositeChecksumRdr.Process(getPartChecksum(checksums.Algorithm, part)); err != nil { + return res, "", fmt.Errorf("process %v part checksum: %w", *part.PartNumber, err) + } + } } if input.MpuObjectSize != nil && totalsize != *input.MpuObjectSize { return res, "", s3err.GetIncorrectMpObjectSizeErr(totalsize, *input.MpuObjectSize) } - var compositeChecksumRdr *utils.CompositeChecksumReader - if checksums.Type == types.ChecksumTypeComposite { - // initialize the composite checksum reader if the mp checksum type is COMPOSITE - compositeChecksumRdr, err = utils.NewCompositeChecksumReader(utils.HashType(strings.ToLower(string(checksums.Algorithm)))) - if err != nil { - return res, "", fmt.Errorf("initialize composite checksum reader: %w", err) + // Compute the final checksum value. + var value string + switch checksums.Type { + case types.ChecksumTypeComposite: + value = fmt.Sprintf("%s-%v", compositeChecksumRdr.Sum(), len(parts)) + case types.ChecksumTypeFullObject: + value = composableCsum + } + + var crc32 *string + var crc32c *string + var sha1 *string + var sha256 *string + var crc64nvme *string + var gotSum *string + + switch checksums.Algorithm { + case types.ChecksumAlgorithmCrc32: + gotSum = input.ChecksumCRC32 + checksums.CRC32 = &value + crc32 = &value + case types.ChecksumAlgorithmCrc32c: + gotSum = input.ChecksumCRC32C + checksums.CRC32C = &value + crc32c = &value + case types.ChecksumAlgorithmSha1: + gotSum = input.ChecksumSHA1 + checksums.SHA1 = &value + sha1 = &value + case types.ChecksumAlgorithmSha256: + gotSum = input.ChecksumSHA256 + checksums.SHA256 = &value + sha256 = &value + case types.ChecksumAlgorithmCrc64nvme: + gotSum = input.ChecksumCRC64NVME + checksums.CRC64NVME = &value + crc64nvme = &value + } + + // Check if the provided checksum and the calculated one are the same. + if mpChecksumType != "" && gotSum != nil { + s := *gotSum + if checksums.Type == types.ChecksumTypeComposite && !strings.Contains(s, "-") { + s = fmt.Sprintf("%s-%v", s, len(parts)) + } + if s != value { + return res, "", s3err.GetChecksumBadDigestErr(checksums.Algorithm) } } @@ -1838,56 +1964,14 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C } defer f.cleanup() - var composableCsum string var abortOnErrSet bool - for i, part := range parts { + for _, part := range parts { partObjPath := filepath.Join(objdir, activeUploadName, fmt.Sprintf("%v", *part.PartNumber)) fullPartPath := filepath.Join(bucket, partObjPath) pf, err := os.Open(fullPartPath) if err != nil { return res, "", fmt.Errorf("open part %v: %v", *part.PartNumber, err) } - pfi, err := pf.Stat() - if err != nil { - pf.Close() - return res, "", fmt.Errorf("stat part %v: %v", *part.PartNumber, err) - } - - switch checksums.Type { - case types.ChecksumTypeFullObject: - var partChecksum string - if mpChecksumType != "" { - // if any checksum has been initially specified on mp creation - // read the part checksum configuration - partChecksum = getPartChecksum(checksums.Algorithm, part) - } else { - // if no checksum has been specified on mp creation - // retrieve the internally stored crc64nvme - crc64nvme, err := p.meta.RetrieveAttribute(pf, bucket, partObjPath, partCrc64nvme) - if err != nil { - pf.Close() - return res, "", fmt.Errorf("retrieve part internal crc64nvme: %w", err) - } - partChecksum = string(crc64nvme) - } - if i == 0 { - composableCsum = partChecksum - break - } - composableCsum, err = utils.AddCRCChecksum(checksums.Algorithm, composableCsum, partChecksum, pfi.Size()) - if err != nil { - pf.Close() - return res, "", fmt.Errorf("add part %v checksum: %w", - *part.PartNumber, err) - } - case types.ChecksumTypeComposite: - err := compositeChecksumRdr.Process(getPartChecksum(checksums.Algorithm, part)) - if err != nil { - pf.Close() - return res, "", fmt.Errorf("process %v part checksum: %w", - *part.PartNumber, err) - } - } if customCopy != nil { idemp, err := customCopy(pf, f.File()) @@ -2004,59 +2088,6 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C } } - var crc32 *string - var crc32c *string - var sha1 *string - var sha256 *string - var crc64nvme *string - - var value string - switch checksums.Type { - case types.ChecksumTypeComposite: - value = fmt.Sprintf("%s-%v", compositeChecksumRdr.Sum(), len(parts)) - case types.ChecksumTypeFullObject: - value = composableCsum - } - - var gotSum *string - - switch checksums.Algorithm { - case types.ChecksumAlgorithmCrc32: - gotSum = input.ChecksumCRC32 - checksums.CRC32 = &value - crc32 = &value - case types.ChecksumAlgorithmCrc32c: - gotSum = input.ChecksumCRC32C - checksums.CRC32C = &value - crc32c = &value - case types.ChecksumAlgorithmSha1: - gotSum = input.ChecksumSHA1 - checksums.SHA1 = &value - sha1 = &value - case types.ChecksumAlgorithmSha256: - gotSum = input.ChecksumSHA256 - checksums.SHA256 = &value - sha256 = &value - case types.ChecksumAlgorithmCrc64nvme: - gotSum = input.ChecksumCRC64NVME - checksums.CRC64NVME = &value - crc64nvme = &value - } - - // Check if the provided checksum and the calculated one are the same - if mpChecksumType != "" && gotSum != nil { - s := *gotSum - if checksums.Type == types.ChecksumTypeComposite && !strings.Contains(s, "-") { - // if number of parts is not specified in the final checksum - // make sure to add, to not fail in the final comparison - s = fmt.Sprintf("%s-%v", s, len(parts)) - } - - if s != value { - return res, "", s3err.GetChecksumBadDigestErr(checksums.Algorithm) - } - } - err = p.storeChecksums(f.File(), bucket, object, checksums) if err != nil { return res, "", fmt.Errorf("store object checksum: %w", err) @@ -2074,14 +2105,24 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C } } - // Calculate s3 compatible md5sum for complete multipart. - s3MD5 := backend.GetMultipartMD5(parts) - err = p.meta.StoreAttribute(f.File(), bucket, object, etagkey, []byte(s3MD5)) if err != nil { return res, "", fmt.Errorf("set etag attr: %w", err) } + // Store multipart upload metadata on the final object so that GetObject / + // HeadObject can serve individual parts by part-number. + mpMeta := backend.MpUploadMetadata{UploadID: uploadID, Parts: partSizes} + mpMetaJSON, err := json.Marshal(mpMeta) + if err != nil { + return res, "", fmt.Errorf("marshal object multipart metadata: %w", err) + } + + err = p.meta.StoreAttribute(f.File(), bucket, object, mpMetaKey, mpMetaJSON) + if err != nil { + return res, "", fmt.Errorf("set object multipart metadata: %w", err) + } + err = f.link() if err != nil { return res, "", fmt.Errorf("link object in namespace: %w", err) @@ -4182,11 +4223,6 @@ func (p *Posix) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.Ge return nil, err } - if input.PartNumber != nil { - // querying an object by part number is not supported - return nil, s3err.GetAPIError(s3err.ErrNotImplemented) - } - if !p.versioningEnabled() && versionId != "" { //TODO: Maybe we need to return our custom error here? return nil, s3err.GetAPIError(s3err.ErrInvalidVersionId) @@ -4370,15 +4406,59 @@ func (p *Posix) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.Ge } objSize := fi.Size() - startOffset, length, isValid, err := backend.ParseObjectRange(objSize, *input.Range) - if err != nil { - return nil, err + if fi.IsDir() { + objSize = 0 } - var contentRange string - if isValid { - contentRange = fmt.Sprintf("bytes %v-%v/%v", - startOffset, startOffset+length-1, objSize) + var contentRange *string + var startOffset, length int64 + var partsCount *int32 + + // If partNumber is requested and mp-metadata exists, serve that specific part. + // For non-multipart objects (no mp-metadata), partNumber=1 returns the full + // object with no Content-Range; any other partNumber is out of range. + // Both range read and partNumber can't be used together. + if input.PartNumber != nil { + mpMetaBytes, metaErr := p.meta.RetrieveAttribute(nil, bucket, object, mpMetaKey) + if metaErr == nil { + var mpMeta backend.MpUploadMetadata + if err := json.Unmarshal(mpMetaBytes, &mpMeta); err != nil { + return nil, fmt.Errorf("parse object multipart metadata: %w", err) + } + + partNum := *input.PartNumber + totalParts := int32(len(mpMeta.Parts)) + partsCount = &totalParts + if partNum > totalParts { + return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange) + } + + // Parts holds cumulative sizes: Parts[i] = sum of sizes 1..i+1 + if partNum > 1 { + startOffset = mpMeta.Parts[partNum-2] + } + length = mpMeta.Parts[partNum-1] - startOffset + contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %d-%d/%d", startOffset, startOffset+length-1, objSize)) + } else if errors.Is(metaErr, meta.ErrNoSuchKey) { + // Non-multipart object: partNumber=1 means the whole object; anything + // higher is out of range + if *input.PartNumber > 1 { + return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange) + } + length = objSize + } else { + return nil, fmt.Errorf("retrieve mp metadata: %w", metaErr) + } + } else { + start, lgth, isValid, err := backend.ParseObjectRange(objSize, getString(input.Range)) + if err != nil { + return nil, err + } + startOffset, length = start, lgth + + if isValid { + contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %v-%v/%v", start, start+lgth-1, objSize)) + } } objMeta := p.loadObjectMetaProperties(f, bucket, object, &fi) @@ -4422,7 +4502,7 @@ func (p *Posix) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.Ge LastModified: backend.GetTimePtr(fi.ModTime()), Metadata: objMeta.Metadata, TagCount: tagCount, - ContentRange: &contentRange, + ContentRange: contentRange, StorageClass: types.StorageClassStandard, VersionId: &versionId, Body: body, @@ -4432,6 +4512,7 @@ func (p *Posix) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.Ge ChecksumSHA256: checksums.SHA256, ChecksumCRC64NVME: checksums.CRC64NVME, ChecksumType: checksums.Type, + PartsCount: partsCount, }, nil } @@ -4451,11 +4532,6 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3. return nil, err } - if input.PartNumber != nil { - // querying an object by part number is not supported - return nil, s3err.GetAPIError(s3err.ErrNotImplemented) - } - if !p.versioningEnabled() && versionId != "" { //TODO: Maybe we need to return our custom error here? return nil, s3err.GetAPIError(s3err.ErrInvalidVersionId) @@ -4569,15 +4645,55 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3. size = 0 } - startOffset, length, isValid, err := backend.ParseObjectRange(size, getString(input.Range)) - if err != nil { - return nil, err - } + var contentRange *string + var startOffset, length int64 + var partsCount *int32 - var contentRange string - if isValid { - contentRange = fmt.Sprintf("bytes %v-%v/%v", - startOffset, startOffset+length-1, size) + // If partNumber is requested and mp-metadata exists, serve that specific part. + // For non-multipart objects (no mp-metadata), partNumber=1 returns the full + // object with no Content-Range; any other partNumber is out of range. + // Both range read and partNumber can't be used together. + if input.PartNumber != nil { + mpMetaBytes, metaErr := p.meta.RetrieveAttribute(nil, bucket, object, mpMetaKey) + if metaErr == nil { + var mpMeta backend.MpUploadMetadata + if err := json.Unmarshal(mpMetaBytes, &mpMeta); err != nil { + return nil, fmt.Errorf("parse object multipart metadata: %w", err) + } + + partNum := *input.PartNumber + totalParts := int32(len(mpMeta.Parts)) + partsCount = &totalParts + if partNum > totalParts { + return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange) + } + + // Parts holds cumulative sizes: Parts[i] = sum of sizes 1..i+1 + if partNum > 1 { + startOffset = mpMeta.Parts[partNum-2] + } + length = mpMeta.Parts[partNum-1] - startOffset + contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %d-%d/%d", startOffset, startOffset+length-1, size)) + } else if errors.Is(metaErr, meta.ErrNoSuchKey) { + // Non-multipart object: partNumber=1 means the whole object; anything + // higher is out of range + if *input.PartNumber > 1 { + return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange) + } + length = size + } else { + return nil, fmt.Errorf("retrieve mp metadata: %w", metaErr) + } + } else { + start, lgth, isValid, err := backend.ParseObjectRange(size, getString(input.Range)) + if err != nil { + return nil, err + } + startOffset, length = start, lgth + + if isValid { + contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %v-%v/%v", start, start+lgth-1, size)) + } } var objectLockLegalHoldStatus types.ObjectLockLegalHoldStatus @@ -4622,7 +4738,7 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3. return &s3.HeadObjectOutput{ ContentLength: &length, AcceptRanges: backend.GetPtrFromString("bytes"), - ContentRange: &contentRange, + ContentRange: contentRange, ContentType: objMeta.ContentType, ContentEncoding: objMeta.ContentEncoding, ContentDisposition: objMeta.ContentDisposition, @@ -4644,6 +4760,7 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3. ChecksumCRC64NVME: checksums.CRC64NVME, ChecksumType: checksums.Type, TagCount: tagCount, + PartsCount: partsCount, }, nil } diff --git a/s3api/controllers/object-get.go b/s3api/controllers/object-get.go index b5330d21..5aaea3f1 100644 --- a/s3api/controllers/object-get.go +++ b/s3api/controllers/object-get.go @@ -460,6 +460,15 @@ func (c S3ApiController) GetObject(ctx *fiber.Ctx) (*Response, error) { }, s3err.GetAPIError(s3err.ErrInvalidPartNumber) } + if acceptRange != "" { + debuglogger.Logf("Range and partNumber cannot both be specified") + return &Response{ + MetaOpts: &MetaOptions{ + BucketOwner: parsedAcl.Owner, + }, + }, s3err.GetAPIError(s3err.ErrRangeAndPartNumber) + } + partNumber = &partNumberQuery } diff --git a/s3api/controllers/object-get_test.go b/s3api/controllers/object-get_test.go index 27bbd874..06950f0e 100644 --- a/s3api/controllers/object-get_test.go +++ b/s3api/controllers/object-get_test.go @@ -722,6 +722,26 @@ func TestS3ApiController_GetObject(t *testing.T) { err: s3err.GetAPIError(s3err.ErrInvalidPartNumber), }, }, + { + name: "both partNumber and Range", + input: testInput{ + locals: defaultLocals, + queries: map[string]string{ + "partNumber": "2", + }, + headers: map[string]string{ + "Range": "bytes=10-20", + }, + }, + output: testOutput{ + response: &Response{ + MetaOpts: &MetaOptions{ + BucketOwner: "root", + }, + }, + err: s3err.GetAPIError(s3err.ErrRangeAndPartNumber), + }, + }, { name: "backend returns error", input: testInput{ diff --git a/s3api/controllers/object-head.go b/s3api/controllers/object-head.go index 96719285..e4c63951 100644 --- a/s3api/controllers/object-head.go +++ b/s3api/controllers/object-head.go @@ -107,6 +107,15 @@ func (c S3ApiController) HeadObject(ctx *fiber.Ctx) (*Response, error) { }, s3err.GetAPIError(s3err.ErrInvalidPartNumber) } + if objRange != "" { + debuglogger.Logf("Range and partNumber cannot both be specified") + return &Response{ + MetaOpts: &MetaOptions{ + BucketOwner: parsedAcl.Owner, + }, + }, s3err.GetAPIError(s3err.ErrRangeAndPartNumber) + } + partNumber = &partNumberQuery } diff --git a/s3api/controllers/object-head_test.go b/s3api/controllers/object-head_test.go index f7ff245d..f62859d1 100644 --- a/s3api/controllers/object-head_test.go +++ b/s3api/controllers/object-head_test.go @@ -98,6 +98,26 @@ func TestS3ApiController_HeadObject(t *testing.T) { err: s3err.GetAPIError(s3err.ErrInvalidPartNumber), }, }, + { + name: "both partNumber and Range", + input: testInput{ + locals: defaultLocals, + queries: map[string]string{ + "partNumber": "6", + }, + headers: map[string]string{ + "Range": "bytes=1-3", + }, + }, + output: testOutput{ + response: &Response{ + MetaOpts: &MetaOptions{ + BucketOwner: "root", + }, + }, + err: s3err.GetAPIError(s3err.ErrRangeAndPartNumber), + }, + }, { name: "invalid checksum mode", input: testInput{ diff --git a/s3err/s3err.go b/s3err/s3err.go index a0fb2cb5..ae9de4fd 100644 --- a/s3err/s3err.go +++ b/s3err/s3err.go @@ -81,6 +81,8 @@ const ( ErrInvalidObjectAttributes ErrInvalidPart ErrInvalidPartNumber + ErrInvalidPartNumberRange + ErrRangeAndPartNumber ErrInvalidPartOrder ErrInvalidCompleteMpPartNumber ErrInternalError @@ -339,6 +341,16 @@ var errorCodeResponse = map[ErrorCode]APIError{ Description: "Part number must be an integer between 1 and 10000, inclusive.", HTTPStatusCode: http.StatusBadRequest, }, + ErrInvalidPartNumberRange: { + Code: "InvalidPartNumber", + Description: "The requested partnumber is not satisfiable.", + HTTPStatusCode: http.StatusRequestedRangeNotSatisfiable, + }, + ErrRangeAndPartNumber: { + Code: "InvalidRequest", + Description: "Cannot specify both Range header and partNumber query parameter", + HTTPStatusCode: http.StatusBadRequest, + }, ErrInvalidPartOrder: { Code: "InvalidPartOrder", Description: "The list of parts was not in ascending order. Parts must be ordered by part number.", diff --git a/tests/integration/CompleteMultipartUpload.go b/tests/integration/CompleteMultipartUpload.go index fb838ef7..78b621ab 100644 --- a/tests/integration/CompleteMultipartUpload.go +++ b/tests/integration/CompleteMultipartUpload.go @@ -1907,13 +1907,77 @@ func CompleteMultipartUpload_racey_success(s *S3Conf) error { }) } +func CompleteMultipartUpload_already_completed(s *S3Conf) error { + testName := "CompleteMultipartUpload_already_completed" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + obj := "my-obj" + out, err := createMp(s3client, bucket, obj) + if err != nil { + return err + } + + parts, _, err := uploadParts(s3client, 5*1024*1024, 1, bucket, obj, *out.UploadId) + if err != nil { + return err + } + + compParts := []types.CompletedPart{} + for _, el := range parts { + compParts = append(compParts, types.CompletedPart{ + ETag: el.ETag, + PartNumber: el.PartNumber, + }) + } + + completeInput := &s3.CompleteMultipartUploadInput{ + Bucket: &bucket, + Key: &obj, + UploadId: out.UploadId, + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: compParts, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + first, err := s3client.CompleteMultipartUpload(ctx, completeInput) + cancel() + if err != nil { + return err + } + + // Second call with the same upload ID should also succeed (idempotent). + ctx, cancel = context.WithTimeout(context.Background(), shortTimeout) + second, err := s3client.CompleteMultipartUpload(ctx, completeInput) + cancel() + if err != nil { + return err + } + + if getString(second.Bucket) != bucket { + return fmt.Errorf("expected Bucket to be %s, instead got %s", bucket, getString(second.Bucket)) + } + if getString(second.Key) != obj { + return fmt.Errorf("expected Key to be %s, instead got %s", obj, getString(second.Key)) + } + if getString(second.ETag) != getString(first.ETag) { + return fmt.Errorf("expected ETag to be %s, instead got %s", getString(first.ETag), getString(second.ETag)) + } + location := constructObjectLocation(s.endpoint, bucket, obj, s.hostStyle) + if getString(second.Location) != location { + return fmt.Errorf("expected Location to be %s, instead got %s", location, getString(second.Location)) + } + + return nil + }) +} + // CompleteMultipartUpload_racey_data_integrity creates a single multipart // upload, uploads its parts, then fires multiple concurrent -// CompleteMultipartUpload calls for the exact same upload ID. Exactly one -// call must succeed; the rest must fail with NoSuchUpload (the upload was -// already consumed). The surviving object must contain exactly the data that -// was uploaded. The whole sequence is repeated several times so that -// intermittent scheduling cannot hide a data-corruption bug. +// CompleteMultipartUpload calls for the exact same upload ID. All calls must +// succeed and return the same ETag (idempotent completion). The surviving +// object must contain exactly the data that was uploaded. The whole sequence +// is repeated several times so that intermittent scheduling cannot hide a +// data-corruption bug. func CompleteMultipartUpload_racey_data_integrity(s *S3Conf) error { testName := "CompleteMultipartUpload_racey_data_integrity" return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { @@ -1924,7 +1988,6 @@ func CompleteMultipartUpload_racey_data_integrity(s *S3Conf) error { ) obj := "my-obj" objSize := int64(15 * 1024 * 1024) // 3 × 5 MiB parts - noSuchUpload := s3err.GetAPIError(s3err.ErrNoSuchUpload) for iter := range iterations { // Phase 1: create one upload and upload its parts. @@ -1938,25 +2001,26 @@ func CompleteMultipartUpload_racey_data_integrity(s *S3Conf) error { return fmt.Errorf("iteration %d: upload parts: %w", iter, err) } + var partsEtagBytes []byte compParts := make([]types.CompletedPart, 0, len(parts)) for _, el := range parts { + b, err := getEtagBytes(*el.ETag) + if err != nil { + return fmt.Errorf("iteration %d: %w", iter, err) + } + partsEtagBytes = append(partsEtagBytes, b...) compParts = append(compParts, types.CompletedPart{ ETag: el.ETag, PartNumber: el.PartNumber, }) } + expectedETag := fmt.Sprintf("\"%s-%d\"", md5String(partsEtagBytes), len(parts)) - // Phase 2: race concurrency complete calls for the same upload ID. - // Exactly one must succeed; the others must return NoSuchUpload. - var ( - mu sync.Mutex - successCnt int - ) eg := errgroup.Group{} for range concurrency { eg.Go(func() error { ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) - _, err := s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + res, err := s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ Bucket: &bucket, Key: &obj, UploadId: out.UploadId, @@ -1965,23 +2029,20 @@ func CompleteMultipartUpload_racey_data_integrity(s *S3Conf) error { }, }) cancel() - if err == nil { - mu.Lock() - successCnt++ - mu.Unlock() - return nil + if err != nil { + return err } - // Every non-zero error must be NoSuchUpload. - return checkApiErr(err, noSuchUpload) + + if getString(res.ETag) != expectedETag { + return fmt.Errorf("expected the multipart upload ETag to be %s, instead got %s", expectedETag, getString(res.ETag)) + } + + return nil }) } if err := eg.Wait(); err != nil { return fmt.Errorf("iteration %d: complete phase: %w", iter, err) } - if successCnt != 1 { - return fmt.Errorf("iteration %d: expected exactly 1 successful complete, got %d", - iter, successCnt) - } // Phase 3: download the object and verify it is complete and // uncorrupted — its checksum must match what was uploaded. diff --git a/tests/integration/GetObject.go b/tests/integration/GetObject.go index 6ee11397..1a5be2ca 100644 --- a/tests/integration/GetObject.go +++ b/tests/integration/GetObject.go @@ -15,6 +15,7 @@ package integration import ( + "bytes" "context" "crypto/sha256" "errors" @@ -1256,17 +1257,246 @@ func GetObject_invalid_part_number(s *S3Conf) error { }) } -func GetObject_part_number_not_supported(s *S3Conf) error { - testName := "GetObject_part_number_not_supported" +func GetObject_range_and_part_number(s *S3Conf) error { + testName := "GetObject_range_and_part_number" return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { - ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) - defer cancel() - _, err := s3client.GetObject(ctx, &s3.GetObjectInput{ - Bucket: &bucket, - Key: getPtr("obj"), - PartNumber: getPtr(int32(3)), - }) + obj := "my-obj" + _, err := putObjectWithData(100, &s3.PutObjectInput{ + Bucket: &bucket, + Key: &obj, + }, s3client) + if err != nil { + return err + } - return checkApiErr(err, s3err.GetAPIError(s3err.ErrNotImplemented)) + pn := int32(1) + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + _, err = s3client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: &bucket, + Key: &obj, + Range: getPtr("bytes=0-9"), + PartNumber: &pn, + }) + cancel() + return checkApiErr(err, s3err.GetAPIError(s3err.ErrRangeAndPartNumber)) + }) +} + +func GetObject_mp_part_number_exceeds_parts_count(s *S3Conf) error { + testName := "GetObject_mp_part_number_exceeds_parts_count" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + obj := "my-obj" + out, err := createMp(s3client, bucket, obj) + if err != nil { + return err + } + + const partCount = int64(5) + parts, _, err := uploadParts(s3client, partCount*5*1024*1024, partCount, bucket, obj, *out.UploadId) + if err != nil { + return err + } + + compParts := make([]types.CompletedPart, len(parts)) + for i, p := range parts { + compParts[i] = types.CompletedPart{ + ETag: p.ETag, + PartNumber: p.PartNumber, + } + } + + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + _, err = s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: &bucket, + Key: &obj, + UploadId: out.UploadId, + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: compParts, + }, + }) + cancel() + if err != nil { + return err + } + + // partNumber exceeds the number of parts in the completed upload + pn := int32(partCount + 1) + ctx, cancel = context.WithTimeout(context.Background(), shortTimeout) + _, err = s3client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: &bucket, + Key: &obj, + PartNumber: &pn, + }) + cancel() + return checkApiErr(err, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange)) + }) +} + +func GetObject_mp_part_number_success(s *S3Conf) error { + testName := "GetObject_mp_part_number_success" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + obj := "my-obj" + out, err := createMp(s3client, bucket, obj) + if err != nil { + return err + } + + const partCount = 3 + const partSize = int64(5 * 1024 * 1024) + const totalSize = int64(partCount) * partSize + + // Upload parts manually to capture per-part data for integrity checks + partNumbers := make([]int32, partCount) + partChecksums := make([][32]byte, partCount) + compParts := make([]types.CompletedPart, partCount) + + for i := range partCount { + partNumbers[i] = int32(i + 1) + buf := make([]byte, partSize) + for j := range buf { + buf[j] = byte(i*17 + j%251) + } + partChecksums[i] = sha256.Sum256(buf) + + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + res, err := s3client.UploadPart(ctx, &s3.UploadPartInput{ + Bucket: &bucket, + Key: &obj, + UploadId: out.UploadId, + Body: bytes.NewReader(buf), + PartNumber: &partNumbers[i], + }) + cancel() + if err != nil { + return fmt.Errorf("upload part %d: %w", partNumbers[i], err) + } + compParts[i] = types.CompletedPart{ + ETag: res.ETag, + PartNumber: &partNumbers[i], + } + } + + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + _, err = s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: &bucket, + Key: &obj, + UploadId: out.UploadId, + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: compParts, + }, + }) + cancel() + if err != nil { + return err + } + + for i := range partCount { + pn := int32(i + 1) + startByte := int64(i) * partSize + endByte := startByte + partSize - 1 + expectedContentRange := fmt.Sprintf("bytes %d-%d/%d", startByte, endByte, totalSize) + + ctx, cancel := context.WithTimeout(context.Background(), longTimeout) + res, err := s3client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: &bucket, + Key: &obj, + PartNumber: &pn, + }) + if err != nil { + cancel() + return fmt.Errorf("part %d: %w", pn, err) + } + + if res.PartsCount == nil { + cancel() + return fmt.Errorf("part %d: expected non-nil x-amz-mp-parts-count", pn) + } + if *res.PartsCount != int32(partCount) { + cancel() + return fmt.Errorf("part %d: expected PartsCount %d, got %d", pn, partCount, *res.PartsCount) + } + if getString(res.ContentRange) != expectedContentRange { + cancel() + return fmt.Errorf("part %d: expected Content-Range %q, got %q", pn, expectedContentRange, getString(res.ContentRange)) + } + if res.ContentLength == nil || *res.ContentLength != partSize { + cancel() + return fmt.Errorf("part %d: expected Content-Length %d, got %v", pn, partSize, res.ContentLength) + } + if getString(res.AcceptRanges) != "bytes" { + cancel() + return fmt.Errorf("part %d: expected Accept-Ranges 'bytes', got %q", pn, getString(res.AcceptRanges)) + } + + body, readErr := io.ReadAll(res.Body) + cancel() + res.Body.Close() + if readErr != nil { + return fmt.Errorf("part %d: read body: %w", pn, readErr) + } + gotSum := sha256.Sum256(body) + if gotSum != partChecksums[i] { + return fmt.Errorf("part %d: data integrity check failed: body checksum mismatch", pn) + } + } + + return nil + }) +} + +func GetObject_non_mp_part_number_1_success(s *S3Conf) error { + testName := "GetObject_non_mp_part_number_1_success" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + obj := "put-object-part1" + const objSize = int64(1234) + + out, err := putObjectWithData(objSize, &s3.PutObjectInput{ + Bucket: &bucket, + Key: &obj, + }, s3client) + if err != nil { + return err + } + + pn := int32(1) + ctx, cancel := context.WithTimeout(context.Background(), longTimeout) + res, err := s3client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: &bucket, + Key: &obj, + PartNumber: &pn, + }) + if err != nil { + cancel() + return err + } + + if res.ContentLength == nil || *res.ContentLength != objSize { + cancel() + return fmt.Errorf("expected ContentLength %d, got %v", objSize, res.ContentLength) + } + if getString(res.ContentRange) != "" { + cancel() + return fmt.Errorf("expected empty Content-Range for non-multipart object, got %q", getString(res.ContentRange)) + } + if res.PartsCount != nil { + cancel() + return fmt.Errorf("expected nil PartsCount for non-multipart object, got %d", *res.PartsCount) + } + if getString(res.AcceptRanges) != "bytes" { + cancel() + return fmt.Errorf("expected Accept-Ranges 'bytes', got %q", getString(res.AcceptRanges)) + } + + body, readErr := io.ReadAll(res.Body) + cancel() + res.Body.Close() + if readErr != nil { + return fmt.Errorf("read body: %w", readErr) + } + if gotSum := sha256.Sum256(body); gotSum != out.csum { + return fmt.Errorf("data integrity check failed: body checksum mismatch") + } + + return nil }) } diff --git a/tests/integration/HeadObject.go b/tests/integration/HeadObject.go index 5536d0d5..5164288b 100644 --- a/tests/integration/HeadObject.go +++ b/tests/integration/HeadObject.go @@ -62,21 +62,6 @@ func HeadObject_invalid_part_number(s *S3Conf) error { }) } -func HeadObject_part_number_not_supported(s *S3Conf) error { - testName := "HeadObject_part_number_not_supported" - return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { - partNumber := int32(4) - ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) - _, err := s3client.HeadObject(ctx, &s3.HeadObjectInput{ - Bucket: &bucket, - Key: getPtr("my-obj"), - PartNumber: &partNumber, - }) - cancel() - return checkSdkApiErr(err, "NotImplemented") - }) -} - func HeadObject_non_existing_dir_object(s *S3Conf) error { testName := "HeadObject_non_existing_dir_object" return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { @@ -930,6 +915,202 @@ func HeadObject_overrides_presign_success(s *S3Conf) error { }) } +func HeadObject_range_and_part_number(s *S3Conf) error { + testName := "HeadObject_range_and_part_number" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + obj := "my-obj" + _, err := putObjectWithData(100, &s3.PutObjectInput{ + Bucket: &bucket, + Key: &obj, + }, s3client) + if err != nil { + return err + } + + pn := int32(1) + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + _, err = s3client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: &bucket, + Key: &obj, + Range: getPtr("bytes=0-9"), + PartNumber: &pn, + }) + cancel() + return checkSdkApiErr(err, "BadRequest") + }) +} + +func HeadObject_mp_part_number_exceeds_parts_count(s *S3Conf) error { + testName := "HeadObject_mp_part_number_exceeds_parts_count" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + obj := "my-obj" + out, err := createMp(s3client, bucket, obj) + if err != nil { + return err + } + + const partCount = int64(5) + parts, _, err := uploadParts(s3client, partCount*5*1024*1024, partCount, bucket, obj, *out.UploadId) + if err != nil { + return err + } + + compParts := make([]types.CompletedPart, len(parts)) + for i, p := range parts { + compParts[i] = types.CompletedPart{ + ETag: p.ETag, + PartNumber: p.PartNumber, + } + } + + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + _, err = s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: &bucket, + Key: &obj, + UploadId: out.UploadId, + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: compParts, + }, + }) + cancel() + if err != nil { + return err + } + + // partNumber exceeds the number of parts in the completed upload + pn := int32(partCount + 1) + ctx, cancel = context.WithTimeout(context.Background(), shortTimeout) + _, err = s3client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: &bucket, + Key: &obj, + PartNumber: &pn, + }) + cancel() + return checkSdkApiErr(err, "RequestedRangeNotSatisfiable") + }) +} + +func HeadObject_mp_part_number_success(s *S3Conf) error { + testName := "HeadObject_mp_part_number_success" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + obj := "my-obj" + out, err := createMp(s3client, bucket, obj) + if err != nil { + return err + } + + const partCount = int64(3) + const partSize = int64(5 * 1024 * 1024) + const totalSize = partCount * partSize + + parts, _, err := uploadParts(s3client, totalSize, partCount, bucket, obj, *out.UploadId) + if err != nil { + return err + } + + compParts := make([]types.CompletedPart, len(parts)) + for i, p := range parts { + compParts[i] = types.CompletedPart{ + ETag: p.ETag, + PartNumber: p.PartNumber, + } + } + + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + _, err = s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: &bucket, + Key: &obj, + UploadId: out.UploadId, + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: compParts, + }, + }) + cancel() + if err != nil { + return err + } + + for i := range partCount { + pn := int32(i + 1) + startByte := i * partSize + endByte := startByte + partSize - 1 + expectedContentRange := fmt.Sprintf("bytes %d-%d/%d", startByte, endByte, totalSize) + + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + res, err := s3client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: &bucket, + Key: &obj, + PartNumber: &pn, + }) + cancel() + if err != nil { + return fmt.Errorf("part %d: %w", pn, err) + } + + if res.PartsCount == nil { + return fmt.Errorf("part %d: expected non-nil x-amz-mp-parts-count", pn) + } + if *res.PartsCount != int32(partCount) { + return fmt.Errorf("part %d: expected PartsCount %d, got %d", pn, partCount, *res.PartsCount) + } + if getString(res.ContentRange) != expectedContentRange { + return fmt.Errorf("part %d: expected Content-Range %q, got %q", pn, expectedContentRange, getString(res.ContentRange)) + } + if res.ContentLength == nil || *res.ContentLength != partSize { + return fmt.Errorf("part %d: expected Content-Length %d, got %v", pn, partSize, res.ContentLength) + } + if getString(res.AcceptRanges) != "bytes" { + return fmt.Errorf("part %d: expected Accept-Ranges 'bytes', got %q", pn, getString(res.AcceptRanges)) + } + } + + return nil + }) +} + +func HeadObject_non_mp_part_number_1_success(s *S3Conf) error { + testName := "HeadObject_non_mp_part_number_1_success" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + obj := "put-object-part1" + const objSize = int64(1234) + + _, err := putObjectWithData(objSize, &s3.PutObjectInput{ + Bucket: &bucket, + Key: &obj, + }, s3client) + if err != nil { + return err + } + + pn := int32(1) + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + res, err := s3client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: &bucket, + Key: &obj, + PartNumber: &pn, + }) + cancel() + if err != nil { + return err + } + + if res.ContentLength == nil || *res.ContentLength != objSize { + return fmt.Errorf("expected ContentLength %d, got %v", objSize, res.ContentLength) + } + if getString(res.ContentRange) != "" { + return fmt.Errorf("expected empty Content-Range for non-multipart object, got %q", getString(res.ContentRange)) + } + if res.PartsCount != nil { + return fmt.Errorf("expected nil PartsCount for non-multipart object, got %d", *res.PartsCount) + } + if getString(res.AcceptRanges) != "bytes" { + return fmt.Errorf("expected Accept-Ranges 'bytes', got %q", getString(res.AcceptRanges)) + } + + return nil + }) +} + func HeadObject_overrides_fail_public(s *S3Conf) error { testName := "HeadObject_overrides_fail_public" return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { diff --git a/tests/integration/group-tests.go b/tests/integration/group-tests.go index 988e84c9..a60567b7 100644 --- a/tests/integration/group-tests.go +++ b/tests/integration/group-tests.go @@ -201,7 +201,6 @@ func TestPutObject(ts *TestState) { func TestHeadObject(ts *TestState) { ts.Run(HeadObject_non_existing_object) ts.Run(HeadObject_invalid_part_number) - ts.Run(HeadObject_part_number_not_supported) ts.Run(HeadObject_directory_object_noslash) ts.Run(HeadObject_non_existing_dir_object) ts.Run(HeadObject_invalid_parent_dir) @@ -218,6 +217,10 @@ func TestHeadObject(ts *TestState) { ts.Run(HeadObject_overrides_success) ts.Run(HeadObject_overrides_presign_success) ts.Run(HeadObject_overrides_fail_public) + ts.Run(HeadObject_range_and_part_number) + ts.Run(HeadObject_mp_part_number_exceeds_parts_count) + ts.Run(HeadObject_mp_part_number_success) + ts.Run(HeadObject_non_mp_part_number_1_success) } func TestGetObjectAttributes(ts *TestState) { @@ -256,7 +259,10 @@ func TestGetObject(ts *TestState) { ts.Run(GetObject_overrides_presign_success) ts.Run(GetObject_overrides_fail_public) ts.Run(GetObject_invalid_part_number) - ts.Run(GetObject_part_number_not_supported) + ts.Run(GetObject_range_and_part_number) + ts.Run(GetObject_mp_part_number_exceeds_parts_count) + ts.Run(GetObject_mp_part_number_success) + ts.Run(GetObject_non_mp_part_number_1_success) } func TestListObjects(ts *TestState) { @@ -532,6 +538,7 @@ func TestCompleteMultipartUpload(ts *TestState) { ts.Run(CompleteMultipartUpload_with_metadata) } ts.Run(CompleteMultipartUpload_success) + ts.Run(CompleteMultipartUpload_already_completed) if !ts.conf.azureTests { ts.Run(CompleteMultipartUpload_racey_success) ts.Run(CompleteMultipartUpload_racey_data_integrity) @@ -1378,7 +1385,6 @@ func GetIntTests() IntTests { "PutObject_racey_success": PutObject_racey_success, "HeadObject_non_existing_object": HeadObject_non_existing_object, "HeadObject_invalid_part_number": HeadObject_invalid_part_number, - "HeadObject_part_number_not_supported": HeadObject_part_number_not_supported, "HeadObject_directory_object_noslash": HeadObject_directory_object_noslash, "HeadObject_non_existing_dir_object": HeadObject_non_existing_dir_object, "HeadObject_name_too_long": HeadObject_name_too_long, @@ -1393,6 +1399,10 @@ func GetIntTests() IntTests { "HeadObject_overrides_success": HeadObject_overrides_success, "HeadObject_overrides_presign_success": HeadObject_overrides_presign_success, "HeadObject_overrides_fail_public": HeadObject_overrides_fail_public, + "HeadObject_range_and_part_number": HeadObject_range_and_part_number, + "HeadObject_mp_part_number_exceeds_parts_count": HeadObject_mp_part_number_exceeds_parts_count, + "HeadObject_mp_part_number_success": HeadObject_mp_part_number_success, + "HeadObject_non_mp_part_number_1_success": HeadObject_non_mp_part_number_1_success, "GetObjectAttributes_non_existing_bucket": GetObjectAttributes_non_existing_bucket, "GetObjectAttributes_non_existing_object": GetObjectAttributes_non_existing_object, "GetObjectAttributes_invalid_attrs": GetObjectAttributes_invalid_attrs, @@ -1419,7 +1429,10 @@ func GetIntTests() IntTests { "GetObject_overrides_presign_success": GetObject_overrides_presign_success, "GetObject_overrides_fail_public": GetObject_overrides_fail_public, "GetObject_invalid_part_number": GetObject_invalid_part_number, - "GetObject_part_number_not_supported": GetObject_part_number_not_supported, + "GetObject_range_and_part_number": GetObject_range_and_part_number, + "GetObject_mp_part_number_exceeds_parts_count": GetObject_mp_part_number_exceeds_parts_count, + "GetObject_mp_part_number_success": GetObject_mp_part_number_success, + "GetObject_non_mp_part_number_1_success": GetObject_non_mp_part_number_1_success, "ListObjects_non_existing_bucket": ListObjects_non_existing_bucket, "ListObjects_with_prefix": ListObjects_with_prefix, "ListObjects_truncated": ListObjects_truncated, @@ -1612,6 +1625,7 @@ func GetIntTests() IntTests { "CompleteMultipartUpload_should_ignore_the_final_checksum": CompleteMultipartUpload_should_ignore_the_final_checksum, "CompleteMultipartUpload_should_succeed_without_final_checksum_type": CompleteMultipartUpload_should_succeed_without_final_checksum_type, "CompleteMultipartUpload_success": CompleteMultipartUpload_success, + "CompleteMultipartUpload_already_completed": CompleteMultipartUpload_already_completed, "CompleteMultipartUpload_racey_success": CompleteMultipartUpload_racey_success, "CompleteMultipartUpload_racey_data_integrity": CompleteMultipartUpload_racey_data_integrity, "PutBucketAcl_non_existing_bucket": PutBucketAcl_non_existing_bucket, diff --git a/tests/integration/utils.go b/tests/integration/utils.go index 3f98e7dd..dd992453 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -2700,3 +2700,12 @@ func sendPostObject(input PostRequestConfig) (*http.Response, error) { return input.s3Conf.httpClient.Do(req) } + +func getEtagBytes(etag string) ([]byte, error) { + return hex.DecodeString(strings.ReplaceAll(etag, string('"'), "")) +} + +func md5String(data []byte) string { + sum := md5.Sum(data) + return hex.EncodeToString(sum[:]) +}