diff --git a/backend/azure/azure.go b/backend/azure/azure.go index 2a5e82c3..847b03e5 100644 --- a/backend/azure/azure.go +++ b/backend/azure/azure.go @@ -73,6 +73,10 @@ const ( // keyMpZeroBytesParts tracks zero-byte upload parts in the sgwtmp metadata. // Azure StageBlock rejects Content-Length: 0, so zero-byte parts are stored here. keyMpZeroBytesParts key = "Zerobytesparts" + // keyMpMetadata stores multipart upload part-offset metadata on the final + // committed blob so that GetObject/HeadObject can serve individual parts + // by part-number. + keyMpMetadata key = "Mpmetadata" defaultListingMaxKeys = 1000 ) @@ -89,6 +93,7 @@ func (key) Table() map[string]struct{} { "objectlegalhold": {}, "objname": {}, ".sgwtmp/multipart": {}, + "mpmetadata": {}, } } @@ -463,11 +468,6 @@ func (az *Azure) DeleteBucketTagging(ctx context.Context, bucket string) error { } func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.GetObjectOutput, error) { - if input.PartNumber != nil { - // querying an object with part number is not supported - return nil, s3err.GetAPIError(s3err.ErrNotImplemented) - } - client, err := az.getBlobClient(*input.Bucket, *input.Key) if err != nil { return nil, err @@ -492,8 +492,53 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.G } var opts *azblob.DownloadStreamOptions - if *input.Range != "" { - offset, count, isValid, err := backend.ParseObjectRange(*resp.ContentLength, *input.Range) + var partsCount *int32 + var contentRange *string + + if input.PartNumber != nil { + // Serve a specific part if the object has multipart upload metadata. + // For non-multipart objects (no mp-metadata), partNumber=1 returns the + // full object with no Content-Range; any other partNumber is out of range. + if mpMetaStr, ok := resp.Metadata[string(keyMpMetadata)]; ok && mpMetaStr != nil { + var mpMeta backend.MpUploadMetadata + if err := json.Unmarshal([]byte(*mpMetaStr), &mpMeta); err != nil { + return nil, fmt.Errorf("parse object multipart metadata: %w", err) + } + + partNum := *input.PartNumber + totalParts := int32(len(mpMeta.Parts)) + partsCount = &totalParts + if partNum > totalParts { + return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange) + } + + var startOffset int64 + if partNum > 1 { + startOffset = mpMeta.Parts[partNum-2] + } + length := mpMeta.Parts[partNum-1] - startOffset + var objSize int64 + if resp.ContentLength != nil { + objSize = *resp.ContentLength + } + contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %d-%d/%d", startOffset, startOffset+length-1, objSize)) + opts = &azblob.DownloadStreamOptions{ + Range: blob.HTTPRange{ + Offset: startOffset, + Count: length, + }, + } + } else if *input.PartNumber > 1 { + return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange) + } + // partNumber=1 on a non-multipart object: fall through and serve the + // full object without a range (opts remains nil, contentRange stays nil). + } else if *input.Range != "" { + var objSize int64 + if resp.ContentLength != nil { + objSize = *resp.ContentLength + } + offset, count, isValid, err := backend.ParseObjectRange(objSize, *input.Range) if err != nil { return nil, err } @@ -504,8 +549,10 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.G Offset: offset, }, } + contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %v-%v/%v", offset, offset+count-1, objSize)) } } + blobDownloadResponse, err := az.client.DownloadStream(ctx, *input.Bucket, *input.Key, opts) if err != nil { return nil, azureErrToS3Err(err) @@ -529,18 +576,14 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.G LastModified: blobDownloadResponse.LastModified, Metadata: parseAndFilterAzMetadata(blobDownloadResponse.Metadata), TagCount: &tagcount, - ContentRange: blobDownloadResponse.ContentRange, + ContentRange: contentRange, Body: blobDownloadResponse.Body, StorageClass: types.StorageClassStandard, + PartsCount: partsCount, }, nil } func (az *Azure) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) { - if input.PartNumber != nil { - // querying an object with part number is not supported - return nil, s3err.GetAPIError(s3err.ErrNotImplemented) - } - client, err := az.getBlobClient(*input.Bucket, *input.Key) if err != nil { return nil, err @@ -569,21 +612,57 @@ func (az *Azure) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3 size = *resp.ContentLength } - startOffset, length, isValid, err := backend.ParseObjectRange(size, getString(input.Range)) - if err != nil { - return nil, err - } - var contentRange string - if isValid { - contentRange = fmt.Sprintf("bytes %v-%v/%v", - startOffset, startOffset+length-1, size) + var length int64 + var partsCount *int32 + + if input.PartNumber != nil { + // Serve a specific part if the object has multipart upload metadata. + // For non-multipart objects (no mp-metadata), partNumber=1 returns the + // full object with no Content-Range; any other partNumber is out of range. + if mpMetaStr, ok := resp.Metadata[string(keyMpMetadata)]; ok && mpMetaStr != nil { + var mpMeta backend.MpUploadMetadata + if err := json.Unmarshal([]byte(*mpMetaStr), &mpMeta); err != nil { + return nil, fmt.Errorf("parse object multipart metadata: %w", err) + } + + partNum := *input.PartNumber + totalParts := int32(len(mpMeta.Parts)) + partsCount = &totalParts + if partNum > totalParts { + return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange) + } + + var startOffset int64 + if partNum > 1 { + startOffset = mpMeta.Parts[partNum-2] + } + length = mpMeta.Parts[partNum-1] - startOffset + contentRange = fmt.Sprintf("bytes %d-%d/%d", startOffset, startOffset+length-1, size) + } else if *input.PartNumber > 1 { + return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange) + } else { + // partNumber=1 on a non-multipart object: return full object size, + // no Content-Range, no PartsCount. + length = size + } + } else { + startOffset, lgth, isValid, err := backend.ParseObjectRange(size, getString(input.Range)) + if err != nil { + return nil, err + } + length = lgth + if isValid { + contentRange = fmt.Sprintf("bytes %v-%v/%v", + startOffset, startOffset+length-1, size) + } } result := &s3.HeadObjectOutput{ ContentRange: &contentRange, AcceptRanges: backend.GetPtrFromString("bytes"), ContentLength: &length, + PartsCount: partsCount, ContentType: resp.ContentType, ContentEncoding: resp.ContentEncoding, ContentLanguage: resp.ContentLanguage, @@ -1680,7 +1759,29 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete props, err := blobClient.GetProperties(ctx, nil) if err != nil { - return res, "", parseMpError(err) + mpErr := parseMpError(err) + // If the tmp blob is already gone, the upload may have already been + // completed. Check the final object's mp-metadata and return success + // if the upload IDs match. + if errors.Is(mpErr, s3err.GetAPIError(s3err.ErrNoSuchUpload)) { + finalClient, clientErr := az.getBlobClient(*input.Bucket, *input.Key) + if clientErr == nil { + finalProps, propErr := finalClient.GetProperties(ctx, nil) + if propErr == nil { + if mpMetaStr, ok := finalProps.Metadata[string(keyMpMetadata)]; ok && mpMetaStr != nil { + var mpMeta backend.MpUploadMetadata + if jsonErr := json.Unmarshal([]byte(*mpMetaStr), &mpMeta); jsonErr == nil && mpMeta.UploadID == *input.UploadId { + return s3response.CompleteMultipartUploadResult{ + Bucket: input.Bucket, + Key: input.Key, + ETag: backend.GetPtrFromString(convertAzureEtag(finalProps.ETag)), + }, "", nil + } + } + } + } + } + return res, "", mpErr } tags, err := blobClient.GetTags(ctx, nil) if err != nil { @@ -1728,6 +1829,8 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete // The initial value is the lower limit of partNumber: 0 var totalSize int64 var partNumber int32 + // partSizes[i] = cumulative byte offset after part i+1 (see backend.MpUploadMetadata) + var partSizes []int64 last := len(input.MultipartUpload.Parts) - 1 for i, part := range input.MultipartUpload.Parts { if part.PartNumber == nil { @@ -1754,6 +1857,7 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete return res, "", s3err.GetAPIError(s3err.ErrEntityTooSmall) } // Zero-byte parts contribute no data; skip adding to blockIds. + partSizes = append(partSizes, totalSize) continue } return res, "", s3err.GetAPIError(s3err.ErrInvalidPart) @@ -1768,6 +1872,7 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete return res, "", s3err.GetAPIError(s3err.ErrEntityTooSmall) } totalSize += *block.Size + partSizes = append(partSizes, totalSize) blockIds = append(blockIds, *block.Name) } @@ -1779,6 +1884,18 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete // Remove internal tracking keys from metadata before storing on the final blob. delete(props.Metadata, string(keyMpZeroBytesParts)) + // Serialize multipart metadata so GetObject/HeadObject can serve by part-number. + mpMeta := backend.MpUploadMetadata{UploadID: *input.UploadId, Parts: partSizes} + mpMetaJSON, err := json.Marshal(mpMeta) + if err != nil { + return res, "", fmt.Errorf("marshal mp metadata: %w", err) + } + mpMetaStr := string(mpMetaJSON) + if props.Metadata == nil { + props.Metadata = map[string]*string{} + } + props.Metadata[string(keyMpMetadata)] = &mpMetaStr + opts := &blockblob.CommitBlockListOptions{ Metadata: props.Metadata, Tags: parseAzTags(tags.BlobTagSet), diff --git a/backend/common.go b/backend/common.go index 83c30b93..14fbc89f 100644 --- a/backend/common.go +++ b/backend/common.go @@ -384,21 +384,21 @@ func isValidTagComponent(str string) bool { return validTagComponent.Match([]byte(str)) } -func GetMultipartMD5(parts []types.CompletedPart) string { +func GetMultipartMD5(parts []types.CompletedPart) (string, error) { var partsEtagBytes []byte for _, part := range parts { - partsEtagBytes = append(partsEtagBytes, getEtagBytes(*part.ETag)...) + bts, err := getEtagBytes(*part.ETag) + if err != nil { + return "", fmt.Errorf("decode etag: %w", err) + } + partsEtagBytes = append(partsEtagBytes, bts...) } - return fmt.Sprintf("\"%s-%d\"", md5String(partsEtagBytes), len(parts)) + return fmt.Sprintf("\"%s-%d\"", md5String(partsEtagBytes), len(parts)), nil } -func getEtagBytes(etag string) []byte { - decode, err := hex.DecodeString(strings.ReplaceAll(etag, string('"'), "")) - if err != nil { - return []byte(etag) - } - return decode +func getEtagBytes(etag string) ([]byte, error) { + return hex.DecodeString(strings.ReplaceAll(etag, string('"'), "")) } func md5String(data []byte) string { @@ -406,6 +406,16 @@ func md5String(data []byte) string { return hex.EncodeToString(sum[:]) } +// MpUploadMetadata is stored alongside the final object after a multipart +// upload completes. It records the uploadId and the cumulative byte offsets +// of each part: parts[i] = sum of sizes of parts 1..i+1. This allows O(1) +// total size lookup (parts[last]) and O(1) part-start/size derivation for +// GetObject/HeadObject part-number requests. +type MpUploadMetadata struct { + UploadID string `json:"uploadId"` + Parts []int64 `json:"parts"` +} + type FileSectionReadCloser struct { R io.Reader F *os.File diff --git a/backend/posix/posix.go b/backend/posix/posix.go index bf4c9fa6..397babb1 100644 --- a/backend/posix/posix.go +++ b/backend/posix/posix.go @@ -136,6 +136,7 @@ const ( deleteMarkerKey = "delete-marker" versionIdKey = "version-id" partCrc64nvme = "part-crc64nvme" + mpMetaKey = "mp-metadata" nullVersionId = "null" @@ -1690,25 +1691,69 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C return res, "", fmt.Errorf("stat bucket: %w", err) } - sum, err := p.checkUploadIDExists(bucket, object, uploadID) - if err != nil { - return res, "", err - } - - // Atomically rename the upload directory to .inprogress so that - // concurrent CompleteMultipartUpload calls for the same upload ID see it as - // absent and return ErrNoSuchUpload rather than racing through the rest of - // the function. If this call does not succeed we rename it back (see the - // deferred cleanup below). + // Rename the upload directory to to atomically claim + // the processing slot. A concurrent call with the same uploadId will compute + // the same ETag, so it will either find the directory still present (still + // processing) or gone (already completed) and react accordingly. + sum := sha256.Sum256([]byte(object)) objdirFull := filepath.Join(bucket, MetaTmpMultipartDir, fmt.Sprintf("%x", sum)) uploadIDDir := filepath.Join(objdirFull, uploadID) - activeUploadName := uploadID + inProgressSuffix + // Calculate s3 compatible md5sum for complete multipart. + s3MD5, err := backend.GetMultipartMD5(parts) + if err != nil { + return res, "", s3err.GetAPIError(s3err.ErrInvalidPart) + } + activeUploadName := fmt.Sprintf("%s.%s%s", uploadID, strings.Trim(s3MD5, "\""), inProgressSuffix) uploadIDInProgress := filepath.Join(objdirFull, activeUploadName) - if err := os.Rename(uploadIDDir, uploadIDInProgress); err != nil { - if errors.Is(err, fs.ErrNotExist) { - return res, "", s3err.GetAPIError(s3err.ErrNoSuchUpload) + + err = os.Rename(uploadIDDir, uploadIDInProgress) + if errors.Is(err, fs.ErrNotExist) { + // Another call already claimed this slot and is still assembling the object. + if _, statErr := os.Stat(uploadIDInProgress); statErr == nil { + // Still in progress — treat as success for idempotency. + return s3response.CompleteMultipartUploadResult{ + Bucket: &bucket, + ETag: &s3MD5, + Key: &object, + }, "", nil } - return res, "", fmt.Errorf("mark upload in-progress: %w", err) + // Directory is gone: the concurrent call already completed and cleaned up. + if _, statErr := os.Stat(filepath.Join(bucket, object)); statErr == nil { + return s3response.CompleteMultipartUploadResult{ + Bucket: &bucket, + ETag: &s3MD5, + Key: &object, + }, "", nil + } + + // Last resort: the object stat above may have lost a race with the + // concurrent call's link step. Check the mp-metadata xattr, as this + // multipart upload may have been finalized and the final object has been created + // before or by the racing request + if mpMetaBytes, statErr := p.meta.RetrieveAttribute(nil, bucket, object, mpMetaKey); statErr == nil { + var mpMeta backend.MpUploadMetadata + if err := json.Unmarshal(mpMetaBytes, &mpMeta); err != nil { + return res, "", fmt.Errorf("parse object multipart metadata: %w", err) + } + + // The object may have been overwritten by a newer upload or + // it's the result of a completely different multipart upload; only + // treat it as our completion if the upload IDs match. + if mpMeta.UploadID != uploadID { + return res, "", s3err.GetAPIError(s3err.ErrNoSuchUpload) + } + + return s3response.CompleteMultipartUploadResult{ + Bucket: &bucket, + ETag: &s3MD5, + Key: &object, + }, "", nil + } + + return res, "", s3err.GetAPIError(s3err.ErrNoSuchUpload) + } + if err != nil { + return res, "", fmt.Errorf("rename upload to etag dir: %w", err) } // Rename sidecar metadata to match the new data directory path. @@ -1761,11 +1806,23 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C checksums.Algorithm = types.ChecksumAlgorithmCrc64nvme } + // Initialize composite checksum reader + var compositeChecksumRdr *utils.CompositeChecksumReader + if checksums.Type == types.ChecksumTypeComposite { + compositeChecksumRdr, err = utils.NewCompositeChecksumReader(utils.HashType(strings.ToLower(string(checksums.Algorithm)))) + if err != nil { + return res, "", fmt.Errorf("initialize composite checksum reader: %w", err) + } + } + // check all parts ok last := len(parts) - 1 var totalsize int64 + // cumulative byte offsets: partSizes[i] = sum of sizes of parts 1..i+1 + var partSizes []int64 + var composableCsum string - // The initialie values is the lower limit of partNumber: 0 + // The initial value is the lower limit of partNumber: 0 var partNumber int32 for i, part := range parts { if part.PartNumber == nil { @@ -1788,6 +1845,7 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C } totalsize += fi.Size() + partSizes = append(partSizes, totalsize) // all parts except the last need to be greater, than or equal to // the minimum allowed size (5 Mib) if i < last && fi.Size() < backend.MinPartSize { @@ -1813,18 +1871,86 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C if err != nil { return res, "", err } + + // Accumulate checksum state. + switch checksums.Type { + case types.ChecksumTypeFullObject: + var pcs string + if mpChecksumType != "" { + pcs = getPartChecksum(checksums.Algorithm, part) + } else { + crc64nvme, err := p.meta.RetrieveAttribute(nil, bucket, partObjPath, partCrc64nvme) + if err != nil { + return res, "", fmt.Errorf("retrieve part internal crc64nvme: %w", err) + } + pcs = string(crc64nvme) + } + if i == 0 { + composableCsum = pcs + } else { + composableCsum, err = utils.AddCRCChecksum(checksums.Algorithm, composableCsum, pcs, fi.Size()) + if err != nil { + return res, "", fmt.Errorf("add part %v checksum: %w", *part.PartNumber, err) + } + } + case types.ChecksumTypeComposite: + if err := compositeChecksumRdr.Process(getPartChecksum(checksums.Algorithm, part)); err != nil { + return res, "", fmt.Errorf("process %v part checksum: %w", *part.PartNumber, err) + } + } } if input.MpuObjectSize != nil && totalsize != *input.MpuObjectSize { return res, "", s3err.GetIncorrectMpObjectSizeErr(totalsize, *input.MpuObjectSize) } - var compositeChecksumRdr *utils.CompositeChecksumReader - if checksums.Type == types.ChecksumTypeComposite { - // initialize the composite checksum reader if the mp checksum type is COMPOSITE - compositeChecksumRdr, err = utils.NewCompositeChecksumReader(utils.HashType(strings.ToLower(string(checksums.Algorithm)))) - if err != nil { - return res, "", fmt.Errorf("initialize composite checksum reader: %w", err) + // Compute the final checksum value. + var value string + switch checksums.Type { + case types.ChecksumTypeComposite: + value = fmt.Sprintf("%s-%v", compositeChecksumRdr.Sum(), len(parts)) + case types.ChecksumTypeFullObject: + value = composableCsum + } + + var crc32 *string + var crc32c *string + var sha1 *string + var sha256 *string + var crc64nvme *string + var gotSum *string + + switch checksums.Algorithm { + case types.ChecksumAlgorithmCrc32: + gotSum = input.ChecksumCRC32 + checksums.CRC32 = &value + crc32 = &value + case types.ChecksumAlgorithmCrc32c: + gotSum = input.ChecksumCRC32C + checksums.CRC32C = &value + crc32c = &value + case types.ChecksumAlgorithmSha1: + gotSum = input.ChecksumSHA1 + checksums.SHA1 = &value + sha1 = &value + case types.ChecksumAlgorithmSha256: + gotSum = input.ChecksumSHA256 + checksums.SHA256 = &value + sha256 = &value + case types.ChecksumAlgorithmCrc64nvme: + gotSum = input.ChecksumCRC64NVME + checksums.CRC64NVME = &value + crc64nvme = &value + } + + // Check if the provided checksum and the calculated one are the same. + if mpChecksumType != "" && gotSum != nil { + s := *gotSum + if checksums.Type == types.ChecksumTypeComposite && !strings.Contains(s, "-") { + s = fmt.Sprintf("%s-%v", s, len(parts)) + } + if s != value { + return res, "", s3err.GetChecksumBadDigestErr(checksums.Algorithm) } } @@ -1838,56 +1964,14 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C } defer f.cleanup() - var composableCsum string var abortOnErrSet bool - for i, part := range parts { + for _, part := range parts { partObjPath := filepath.Join(objdir, activeUploadName, fmt.Sprintf("%v", *part.PartNumber)) fullPartPath := filepath.Join(bucket, partObjPath) pf, err := os.Open(fullPartPath) if err != nil { return res, "", fmt.Errorf("open part %v: %v", *part.PartNumber, err) } - pfi, err := pf.Stat() - if err != nil { - pf.Close() - return res, "", fmt.Errorf("stat part %v: %v", *part.PartNumber, err) - } - - switch checksums.Type { - case types.ChecksumTypeFullObject: - var partChecksum string - if mpChecksumType != "" { - // if any checksum has been initially specified on mp creation - // read the part checksum configuration - partChecksum = getPartChecksum(checksums.Algorithm, part) - } else { - // if no checksum has been specified on mp creation - // retrieve the internally stored crc64nvme - crc64nvme, err := p.meta.RetrieveAttribute(pf, bucket, partObjPath, partCrc64nvme) - if err != nil { - pf.Close() - return res, "", fmt.Errorf("retrieve part internal crc64nvme: %w", err) - } - partChecksum = string(crc64nvme) - } - if i == 0 { - composableCsum = partChecksum - break - } - composableCsum, err = utils.AddCRCChecksum(checksums.Algorithm, composableCsum, partChecksum, pfi.Size()) - if err != nil { - pf.Close() - return res, "", fmt.Errorf("add part %v checksum: %w", - *part.PartNumber, err) - } - case types.ChecksumTypeComposite: - err := compositeChecksumRdr.Process(getPartChecksum(checksums.Algorithm, part)) - if err != nil { - pf.Close() - return res, "", fmt.Errorf("process %v part checksum: %w", - *part.PartNumber, err) - } - } if customCopy != nil { idemp, err := customCopy(pf, f.File()) @@ -2004,59 +2088,6 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C } } - var crc32 *string - var crc32c *string - var sha1 *string - var sha256 *string - var crc64nvme *string - - var value string - switch checksums.Type { - case types.ChecksumTypeComposite: - value = fmt.Sprintf("%s-%v", compositeChecksumRdr.Sum(), len(parts)) - case types.ChecksumTypeFullObject: - value = composableCsum - } - - var gotSum *string - - switch checksums.Algorithm { - case types.ChecksumAlgorithmCrc32: - gotSum = input.ChecksumCRC32 - checksums.CRC32 = &value - crc32 = &value - case types.ChecksumAlgorithmCrc32c: - gotSum = input.ChecksumCRC32C - checksums.CRC32C = &value - crc32c = &value - case types.ChecksumAlgorithmSha1: - gotSum = input.ChecksumSHA1 - checksums.SHA1 = &value - sha1 = &value - case types.ChecksumAlgorithmSha256: - gotSum = input.ChecksumSHA256 - checksums.SHA256 = &value - sha256 = &value - case types.ChecksumAlgorithmCrc64nvme: - gotSum = input.ChecksumCRC64NVME - checksums.CRC64NVME = &value - crc64nvme = &value - } - - // Check if the provided checksum and the calculated one are the same - if mpChecksumType != "" && gotSum != nil { - s := *gotSum - if checksums.Type == types.ChecksumTypeComposite && !strings.Contains(s, "-") { - // if number of parts is not specified in the final checksum - // make sure to add, to not fail in the final comparison - s = fmt.Sprintf("%s-%v", s, len(parts)) - } - - if s != value { - return res, "", s3err.GetChecksumBadDigestErr(checksums.Algorithm) - } - } - err = p.storeChecksums(f.File(), bucket, object, checksums) if err != nil { return res, "", fmt.Errorf("store object checksum: %w", err) @@ -2074,14 +2105,24 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C } } - // Calculate s3 compatible md5sum for complete multipart. - s3MD5 := backend.GetMultipartMD5(parts) - err = p.meta.StoreAttribute(f.File(), bucket, object, etagkey, []byte(s3MD5)) if err != nil { return res, "", fmt.Errorf("set etag attr: %w", err) } + // Store multipart upload metadata on the final object so that GetObject / + // HeadObject can serve individual parts by part-number. + mpMeta := backend.MpUploadMetadata{UploadID: uploadID, Parts: partSizes} + mpMetaJSON, err := json.Marshal(mpMeta) + if err != nil { + return res, "", fmt.Errorf("marshal object multipart metadata: %w", err) + } + + err = p.meta.StoreAttribute(f.File(), bucket, object, mpMetaKey, mpMetaJSON) + if err != nil { + return res, "", fmt.Errorf("set object multipart metadata: %w", err) + } + err = f.link() if err != nil { return res, "", fmt.Errorf("link object in namespace: %w", err) @@ -4182,11 +4223,6 @@ func (p *Posix) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.Ge return nil, err } - if input.PartNumber != nil { - // querying an object by part number is not supported - return nil, s3err.GetAPIError(s3err.ErrNotImplemented) - } - if !p.versioningEnabled() && versionId != "" { //TODO: Maybe we need to return our custom error here? return nil, s3err.GetAPIError(s3err.ErrInvalidVersionId) @@ -4370,15 +4406,59 @@ func (p *Posix) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.Ge } objSize := fi.Size() - startOffset, length, isValid, err := backend.ParseObjectRange(objSize, *input.Range) - if err != nil { - return nil, err + if fi.IsDir() { + objSize = 0 } - var contentRange string - if isValid { - contentRange = fmt.Sprintf("bytes %v-%v/%v", - startOffset, startOffset+length-1, objSize) + var contentRange *string + var startOffset, length int64 + var partsCount *int32 + + // If partNumber is requested and mp-metadata exists, serve that specific part. + // For non-multipart objects (no mp-metadata), partNumber=1 returns the full + // object with no Content-Range; any other partNumber is out of range. + // Both range read and partNumber can't be used together. + if input.PartNumber != nil { + mpMetaBytes, metaErr := p.meta.RetrieveAttribute(nil, bucket, object, mpMetaKey) + if metaErr == nil { + var mpMeta backend.MpUploadMetadata + if err := json.Unmarshal(mpMetaBytes, &mpMeta); err != nil { + return nil, fmt.Errorf("parse object multipart metadata: %w", err) + } + + partNum := *input.PartNumber + totalParts := int32(len(mpMeta.Parts)) + partsCount = &totalParts + if partNum > totalParts { + return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange) + } + + // Parts holds cumulative sizes: Parts[i] = sum of sizes 1..i+1 + if partNum > 1 { + startOffset = mpMeta.Parts[partNum-2] + } + length = mpMeta.Parts[partNum-1] - startOffset + contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %d-%d/%d", startOffset, startOffset+length-1, objSize)) + } else if errors.Is(metaErr, meta.ErrNoSuchKey) { + // Non-multipart object: partNumber=1 means the whole object; anything + // higher is out of range + if *input.PartNumber > 1 { + return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange) + } + length = objSize + } else { + return nil, fmt.Errorf("retrieve mp metadata: %w", metaErr) + } + } else { + start, lgth, isValid, err := backend.ParseObjectRange(objSize, getString(input.Range)) + if err != nil { + return nil, err + } + startOffset, length = start, lgth + + if isValid { + contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %v-%v/%v", start, start+lgth-1, objSize)) + } } objMeta := p.loadObjectMetaProperties(f, bucket, object, &fi) @@ -4422,7 +4502,7 @@ func (p *Posix) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.Ge LastModified: backend.GetTimePtr(fi.ModTime()), Metadata: objMeta.Metadata, TagCount: tagCount, - ContentRange: &contentRange, + ContentRange: contentRange, StorageClass: types.StorageClassStandard, VersionId: &versionId, Body: body, @@ -4432,6 +4512,7 @@ func (p *Posix) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.Ge ChecksumSHA256: checksums.SHA256, ChecksumCRC64NVME: checksums.CRC64NVME, ChecksumType: checksums.Type, + PartsCount: partsCount, }, nil } @@ -4451,11 +4532,6 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3. return nil, err } - if input.PartNumber != nil { - // querying an object by part number is not supported - return nil, s3err.GetAPIError(s3err.ErrNotImplemented) - } - if !p.versioningEnabled() && versionId != "" { //TODO: Maybe we need to return our custom error here? return nil, s3err.GetAPIError(s3err.ErrInvalidVersionId) @@ -4569,15 +4645,55 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3. size = 0 } - startOffset, length, isValid, err := backend.ParseObjectRange(size, getString(input.Range)) - if err != nil { - return nil, err - } + var contentRange *string + var startOffset, length int64 + var partsCount *int32 - var contentRange string - if isValid { - contentRange = fmt.Sprintf("bytes %v-%v/%v", - startOffset, startOffset+length-1, size) + // If partNumber is requested and mp-metadata exists, serve that specific part. + // For non-multipart objects (no mp-metadata), partNumber=1 returns the full + // object with no Content-Range; any other partNumber is out of range. + // Both range read and partNumber can't be used together. + if input.PartNumber != nil { + mpMetaBytes, metaErr := p.meta.RetrieveAttribute(nil, bucket, object, mpMetaKey) + if metaErr == nil { + var mpMeta backend.MpUploadMetadata + if err := json.Unmarshal(mpMetaBytes, &mpMeta); err != nil { + return nil, fmt.Errorf("parse object multipart metadata: %w", err) + } + + partNum := *input.PartNumber + totalParts := int32(len(mpMeta.Parts)) + partsCount = &totalParts + if partNum > totalParts { + return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange) + } + + // Parts holds cumulative sizes: Parts[i] = sum of sizes 1..i+1 + if partNum > 1 { + startOffset = mpMeta.Parts[partNum-2] + } + length = mpMeta.Parts[partNum-1] - startOffset + contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %d-%d/%d", startOffset, startOffset+length-1, size)) + } else if errors.Is(metaErr, meta.ErrNoSuchKey) { + // Non-multipart object: partNumber=1 means the whole object; anything + // higher is out of range + if *input.PartNumber > 1 { + return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange) + } + length = size + } else { + return nil, fmt.Errorf("retrieve mp metadata: %w", metaErr) + } + } else { + start, lgth, isValid, err := backend.ParseObjectRange(size, getString(input.Range)) + if err != nil { + return nil, err + } + startOffset, length = start, lgth + + if isValid { + contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %v-%v/%v", start, start+lgth-1, size)) + } } var objectLockLegalHoldStatus types.ObjectLockLegalHoldStatus @@ -4622,7 +4738,7 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3. return &s3.HeadObjectOutput{ ContentLength: &length, AcceptRanges: backend.GetPtrFromString("bytes"), - ContentRange: &contentRange, + ContentRange: contentRange, ContentType: objMeta.ContentType, ContentEncoding: objMeta.ContentEncoding, ContentDisposition: objMeta.ContentDisposition, @@ -4644,6 +4760,7 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3. ChecksumCRC64NVME: checksums.CRC64NVME, ChecksumType: checksums.Type, TagCount: tagCount, + PartsCount: partsCount, }, nil } diff --git a/s3api/controllers/object-get.go b/s3api/controllers/object-get.go index b5330d21..5aaea3f1 100644 --- a/s3api/controllers/object-get.go +++ b/s3api/controllers/object-get.go @@ -460,6 +460,15 @@ func (c S3ApiController) GetObject(ctx *fiber.Ctx) (*Response, error) { }, s3err.GetAPIError(s3err.ErrInvalidPartNumber) } + if acceptRange != "" { + debuglogger.Logf("Range and partNumber cannot both be specified") + return &Response{ + MetaOpts: &MetaOptions{ + BucketOwner: parsedAcl.Owner, + }, + }, s3err.GetAPIError(s3err.ErrRangeAndPartNumber) + } + partNumber = &partNumberQuery } diff --git a/s3api/controllers/object-get_test.go b/s3api/controllers/object-get_test.go index 27bbd874..06950f0e 100644 --- a/s3api/controllers/object-get_test.go +++ b/s3api/controllers/object-get_test.go @@ -722,6 +722,26 @@ func TestS3ApiController_GetObject(t *testing.T) { err: s3err.GetAPIError(s3err.ErrInvalidPartNumber), }, }, + { + name: "both partNumber and Range", + input: testInput{ + locals: defaultLocals, + queries: map[string]string{ + "partNumber": "2", + }, + headers: map[string]string{ + "Range": "bytes=10-20", + }, + }, + output: testOutput{ + response: &Response{ + MetaOpts: &MetaOptions{ + BucketOwner: "root", + }, + }, + err: s3err.GetAPIError(s3err.ErrRangeAndPartNumber), + }, + }, { name: "backend returns error", input: testInput{ diff --git a/s3api/controllers/object-head.go b/s3api/controllers/object-head.go index 96719285..e4c63951 100644 --- a/s3api/controllers/object-head.go +++ b/s3api/controllers/object-head.go @@ -107,6 +107,15 @@ func (c S3ApiController) HeadObject(ctx *fiber.Ctx) (*Response, error) { }, s3err.GetAPIError(s3err.ErrInvalidPartNumber) } + if objRange != "" { + debuglogger.Logf("Range and partNumber cannot both be specified") + return &Response{ + MetaOpts: &MetaOptions{ + BucketOwner: parsedAcl.Owner, + }, + }, s3err.GetAPIError(s3err.ErrRangeAndPartNumber) + } + partNumber = &partNumberQuery } diff --git a/s3api/controllers/object-head_test.go b/s3api/controllers/object-head_test.go index f7ff245d..f62859d1 100644 --- a/s3api/controllers/object-head_test.go +++ b/s3api/controllers/object-head_test.go @@ -98,6 +98,26 @@ func TestS3ApiController_HeadObject(t *testing.T) { err: s3err.GetAPIError(s3err.ErrInvalidPartNumber), }, }, + { + name: "both partNumber and Range", + input: testInput{ + locals: defaultLocals, + queries: map[string]string{ + "partNumber": "6", + }, + headers: map[string]string{ + "Range": "bytes=1-3", + }, + }, + output: testOutput{ + response: &Response{ + MetaOpts: &MetaOptions{ + BucketOwner: "root", + }, + }, + err: s3err.GetAPIError(s3err.ErrRangeAndPartNumber), + }, + }, { name: "invalid checksum mode", input: testInput{ diff --git a/s3err/s3err.go b/s3err/s3err.go index a0fb2cb5..ae9de4fd 100644 --- a/s3err/s3err.go +++ b/s3err/s3err.go @@ -81,6 +81,8 @@ const ( ErrInvalidObjectAttributes ErrInvalidPart ErrInvalidPartNumber + ErrInvalidPartNumberRange + ErrRangeAndPartNumber ErrInvalidPartOrder ErrInvalidCompleteMpPartNumber ErrInternalError @@ -339,6 +341,16 @@ var errorCodeResponse = map[ErrorCode]APIError{ Description: "Part number must be an integer between 1 and 10000, inclusive.", HTTPStatusCode: http.StatusBadRequest, }, + ErrInvalidPartNumberRange: { + Code: "InvalidPartNumber", + Description: "The requested partnumber is not satisfiable.", + HTTPStatusCode: http.StatusRequestedRangeNotSatisfiable, + }, + ErrRangeAndPartNumber: { + Code: "InvalidRequest", + Description: "Cannot specify both Range header and partNumber query parameter", + HTTPStatusCode: http.StatusBadRequest, + }, ErrInvalidPartOrder: { Code: "InvalidPartOrder", Description: "The list of parts was not in ascending order. Parts must be ordered by part number.", diff --git a/tests/integration/CompleteMultipartUpload.go b/tests/integration/CompleteMultipartUpload.go index fb838ef7..78b621ab 100644 --- a/tests/integration/CompleteMultipartUpload.go +++ b/tests/integration/CompleteMultipartUpload.go @@ -1907,13 +1907,77 @@ func CompleteMultipartUpload_racey_success(s *S3Conf) error { }) } +func CompleteMultipartUpload_already_completed(s *S3Conf) error { + testName := "CompleteMultipartUpload_already_completed" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + obj := "my-obj" + out, err := createMp(s3client, bucket, obj) + if err != nil { + return err + } + + parts, _, err := uploadParts(s3client, 5*1024*1024, 1, bucket, obj, *out.UploadId) + if err != nil { + return err + } + + compParts := []types.CompletedPart{} + for _, el := range parts { + compParts = append(compParts, types.CompletedPart{ + ETag: el.ETag, + PartNumber: el.PartNumber, + }) + } + + completeInput := &s3.CompleteMultipartUploadInput{ + Bucket: &bucket, + Key: &obj, + UploadId: out.UploadId, + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: compParts, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + first, err := s3client.CompleteMultipartUpload(ctx, completeInput) + cancel() + if err != nil { + return err + } + + // Second call with the same upload ID should also succeed (idempotent). + ctx, cancel = context.WithTimeout(context.Background(), shortTimeout) + second, err := s3client.CompleteMultipartUpload(ctx, completeInput) + cancel() + if err != nil { + return err + } + + if getString(second.Bucket) != bucket { + return fmt.Errorf("expected Bucket to be %s, instead got %s", bucket, getString(second.Bucket)) + } + if getString(second.Key) != obj { + return fmt.Errorf("expected Key to be %s, instead got %s", obj, getString(second.Key)) + } + if getString(second.ETag) != getString(first.ETag) { + return fmt.Errorf("expected ETag to be %s, instead got %s", getString(first.ETag), getString(second.ETag)) + } + location := constructObjectLocation(s.endpoint, bucket, obj, s.hostStyle) + if getString(second.Location) != location { + return fmt.Errorf("expected Location to be %s, instead got %s", location, getString(second.Location)) + } + + return nil + }) +} + // CompleteMultipartUpload_racey_data_integrity creates a single multipart // upload, uploads its parts, then fires multiple concurrent -// CompleteMultipartUpload calls for the exact same upload ID. Exactly one -// call must succeed; the rest must fail with NoSuchUpload (the upload was -// already consumed). The surviving object must contain exactly the data that -// was uploaded. The whole sequence is repeated several times so that -// intermittent scheduling cannot hide a data-corruption bug. +// CompleteMultipartUpload calls for the exact same upload ID. All calls must +// succeed and return the same ETag (idempotent completion). The surviving +// object must contain exactly the data that was uploaded. The whole sequence +// is repeated several times so that intermittent scheduling cannot hide a +// data-corruption bug. func CompleteMultipartUpload_racey_data_integrity(s *S3Conf) error { testName := "CompleteMultipartUpload_racey_data_integrity" return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { @@ -1924,7 +1988,6 @@ func CompleteMultipartUpload_racey_data_integrity(s *S3Conf) error { ) obj := "my-obj" objSize := int64(15 * 1024 * 1024) // 3 × 5 MiB parts - noSuchUpload := s3err.GetAPIError(s3err.ErrNoSuchUpload) for iter := range iterations { // Phase 1: create one upload and upload its parts. @@ -1938,25 +2001,26 @@ func CompleteMultipartUpload_racey_data_integrity(s *S3Conf) error { return fmt.Errorf("iteration %d: upload parts: %w", iter, err) } + var partsEtagBytes []byte compParts := make([]types.CompletedPart, 0, len(parts)) for _, el := range parts { + b, err := getEtagBytes(*el.ETag) + if err != nil { + return fmt.Errorf("iteration %d: %w", iter, err) + } + partsEtagBytes = append(partsEtagBytes, b...) compParts = append(compParts, types.CompletedPart{ ETag: el.ETag, PartNumber: el.PartNumber, }) } + expectedETag := fmt.Sprintf("\"%s-%d\"", md5String(partsEtagBytes), len(parts)) - // Phase 2: race concurrency complete calls for the same upload ID. - // Exactly one must succeed; the others must return NoSuchUpload. - var ( - mu sync.Mutex - successCnt int - ) eg := errgroup.Group{} for range concurrency { eg.Go(func() error { ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) - _, err := s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + res, err := s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ Bucket: &bucket, Key: &obj, UploadId: out.UploadId, @@ -1965,23 +2029,20 @@ func CompleteMultipartUpload_racey_data_integrity(s *S3Conf) error { }, }) cancel() - if err == nil { - mu.Lock() - successCnt++ - mu.Unlock() - return nil + if err != nil { + return err } - // Every non-zero error must be NoSuchUpload. - return checkApiErr(err, noSuchUpload) + + if getString(res.ETag) != expectedETag { + return fmt.Errorf("expected the multipart upload ETag to be %s, instead got %s", expectedETag, getString(res.ETag)) + } + + return nil }) } if err := eg.Wait(); err != nil { return fmt.Errorf("iteration %d: complete phase: %w", iter, err) } - if successCnt != 1 { - return fmt.Errorf("iteration %d: expected exactly 1 successful complete, got %d", - iter, successCnt) - } // Phase 3: download the object and verify it is complete and // uncorrupted — its checksum must match what was uploaded. diff --git a/tests/integration/GetObject.go b/tests/integration/GetObject.go index 6ee11397..1a5be2ca 100644 --- a/tests/integration/GetObject.go +++ b/tests/integration/GetObject.go @@ -15,6 +15,7 @@ package integration import ( + "bytes" "context" "crypto/sha256" "errors" @@ -1256,17 +1257,246 @@ func GetObject_invalid_part_number(s *S3Conf) error { }) } -func GetObject_part_number_not_supported(s *S3Conf) error { - testName := "GetObject_part_number_not_supported" +func GetObject_range_and_part_number(s *S3Conf) error { + testName := "GetObject_range_and_part_number" return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { - ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) - defer cancel() - _, err := s3client.GetObject(ctx, &s3.GetObjectInput{ - Bucket: &bucket, - Key: getPtr("obj"), - PartNumber: getPtr(int32(3)), - }) + obj := "my-obj" + _, err := putObjectWithData(100, &s3.PutObjectInput{ + Bucket: &bucket, + Key: &obj, + }, s3client) + if err != nil { + return err + } - return checkApiErr(err, s3err.GetAPIError(s3err.ErrNotImplemented)) + pn := int32(1) + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + _, err = s3client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: &bucket, + Key: &obj, + Range: getPtr("bytes=0-9"), + PartNumber: &pn, + }) + cancel() + return checkApiErr(err, s3err.GetAPIError(s3err.ErrRangeAndPartNumber)) + }) +} + +func GetObject_mp_part_number_exceeds_parts_count(s *S3Conf) error { + testName := "GetObject_mp_part_number_exceeds_parts_count" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + obj := "my-obj" + out, err := createMp(s3client, bucket, obj) + if err != nil { + return err + } + + const partCount = int64(5) + parts, _, err := uploadParts(s3client, partCount*5*1024*1024, partCount, bucket, obj, *out.UploadId) + if err != nil { + return err + } + + compParts := make([]types.CompletedPart, len(parts)) + for i, p := range parts { + compParts[i] = types.CompletedPart{ + ETag: p.ETag, + PartNumber: p.PartNumber, + } + } + + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + _, err = s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: &bucket, + Key: &obj, + UploadId: out.UploadId, + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: compParts, + }, + }) + cancel() + if err != nil { + return err + } + + // partNumber exceeds the number of parts in the completed upload + pn := int32(partCount + 1) + ctx, cancel = context.WithTimeout(context.Background(), shortTimeout) + _, err = s3client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: &bucket, + Key: &obj, + PartNumber: &pn, + }) + cancel() + return checkApiErr(err, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange)) + }) +} + +func GetObject_mp_part_number_success(s *S3Conf) error { + testName := "GetObject_mp_part_number_success" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + obj := "my-obj" + out, err := createMp(s3client, bucket, obj) + if err != nil { + return err + } + + const partCount = 3 + const partSize = int64(5 * 1024 * 1024) + const totalSize = int64(partCount) * partSize + + // Upload parts manually to capture per-part data for integrity checks + partNumbers := make([]int32, partCount) + partChecksums := make([][32]byte, partCount) + compParts := make([]types.CompletedPart, partCount) + + for i := range partCount { + partNumbers[i] = int32(i + 1) + buf := make([]byte, partSize) + for j := range buf { + buf[j] = byte(i*17 + j%251) + } + partChecksums[i] = sha256.Sum256(buf) + + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + res, err := s3client.UploadPart(ctx, &s3.UploadPartInput{ + Bucket: &bucket, + Key: &obj, + UploadId: out.UploadId, + Body: bytes.NewReader(buf), + PartNumber: &partNumbers[i], + }) + cancel() + if err != nil { + return fmt.Errorf("upload part %d: %w", partNumbers[i], err) + } + compParts[i] = types.CompletedPart{ + ETag: res.ETag, + PartNumber: &partNumbers[i], + } + } + + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + _, err = s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: &bucket, + Key: &obj, + UploadId: out.UploadId, + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: compParts, + }, + }) + cancel() + if err != nil { + return err + } + + for i := range partCount { + pn := int32(i + 1) + startByte := int64(i) * partSize + endByte := startByte + partSize - 1 + expectedContentRange := fmt.Sprintf("bytes %d-%d/%d", startByte, endByte, totalSize) + + ctx, cancel := context.WithTimeout(context.Background(), longTimeout) + res, err := s3client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: &bucket, + Key: &obj, + PartNumber: &pn, + }) + if err != nil { + cancel() + return fmt.Errorf("part %d: %w", pn, err) + } + + if res.PartsCount == nil { + cancel() + return fmt.Errorf("part %d: expected non-nil x-amz-mp-parts-count", pn) + } + if *res.PartsCount != int32(partCount) { + cancel() + return fmt.Errorf("part %d: expected PartsCount %d, got %d", pn, partCount, *res.PartsCount) + } + if getString(res.ContentRange) != expectedContentRange { + cancel() + return fmt.Errorf("part %d: expected Content-Range %q, got %q", pn, expectedContentRange, getString(res.ContentRange)) + } + if res.ContentLength == nil || *res.ContentLength != partSize { + cancel() + return fmt.Errorf("part %d: expected Content-Length %d, got %v", pn, partSize, res.ContentLength) + } + if getString(res.AcceptRanges) != "bytes" { + cancel() + return fmt.Errorf("part %d: expected Accept-Ranges 'bytes', got %q", pn, getString(res.AcceptRanges)) + } + + body, readErr := io.ReadAll(res.Body) + cancel() + res.Body.Close() + if readErr != nil { + return fmt.Errorf("part %d: read body: %w", pn, readErr) + } + gotSum := sha256.Sum256(body) + if gotSum != partChecksums[i] { + return fmt.Errorf("part %d: data integrity check failed: body checksum mismatch", pn) + } + } + + return nil + }) +} + +func GetObject_non_mp_part_number_1_success(s *S3Conf) error { + testName := "GetObject_non_mp_part_number_1_success" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + obj := "put-object-part1" + const objSize = int64(1234) + + out, err := putObjectWithData(objSize, &s3.PutObjectInput{ + Bucket: &bucket, + Key: &obj, + }, s3client) + if err != nil { + return err + } + + pn := int32(1) + ctx, cancel := context.WithTimeout(context.Background(), longTimeout) + res, err := s3client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: &bucket, + Key: &obj, + PartNumber: &pn, + }) + if err != nil { + cancel() + return err + } + + if res.ContentLength == nil || *res.ContentLength != objSize { + cancel() + return fmt.Errorf("expected ContentLength %d, got %v", objSize, res.ContentLength) + } + if getString(res.ContentRange) != "" { + cancel() + return fmt.Errorf("expected empty Content-Range for non-multipart object, got %q", getString(res.ContentRange)) + } + if res.PartsCount != nil { + cancel() + return fmt.Errorf("expected nil PartsCount for non-multipart object, got %d", *res.PartsCount) + } + if getString(res.AcceptRanges) != "bytes" { + cancel() + return fmt.Errorf("expected Accept-Ranges 'bytes', got %q", getString(res.AcceptRanges)) + } + + body, readErr := io.ReadAll(res.Body) + cancel() + res.Body.Close() + if readErr != nil { + return fmt.Errorf("read body: %w", readErr) + } + if gotSum := sha256.Sum256(body); gotSum != out.csum { + return fmt.Errorf("data integrity check failed: body checksum mismatch") + } + + return nil }) } diff --git a/tests/integration/HeadObject.go b/tests/integration/HeadObject.go index 5536d0d5..5164288b 100644 --- a/tests/integration/HeadObject.go +++ b/tests/integration/HeadObject.go @@ -62,21 +62,6 @@ func HeadObject_invalid_part_number(s *S3Conf) error { }) } -func HeadObject_part_number_not_supported(s *S3Conf) error { - testName := "HeadObject_part_number_not_supported" - return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { - partNumber := int32(4) - ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) - _, err := s3client.HeadObject(ctx, &s3.HeadObjectInput{ - Bucket: &bucket, - Key: getPtr("my-obj"), - PartNumber: &partNumber, - }) - cancel() - return checkSdkApiErr(err, "NotImplemented") - }) -} - func HeadObject_non_existing_dir_object(s *S3Conf) error { testName := "HeadObject_non_existing_dir_object" return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { @@ -930,6 +915,202 @@ func HeadObject_overrides_presign_success(s *S3Conf) error { }) } +func HeadObject_range_and_part_number(s *S3Conf) error { + testName := "HeadObject_range_and_part_number" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + obj := "my-obj" + _, err := putObjectWithData(100, &s3.PutObjectInput{ + Bucket: &bucket, + Key: &obj, + }, s3client) + if err != nil { + return err + } + + pn := int32(1) + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + _, err = s3client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: &bucket, + Key: &obj, + Range: getPtr("bytes=0-9"), + PartNumber: &pn, + }) + cancel() + return checkSdkApiErr(err, "BadRequest") + }) +} + +func HeadObject_mp_part_number_exceeds_parts_count(s *S3Conf) error { + testName := "HeadObject_mp_part_number_exceeds_parts_count" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + obj := "my-obj" + out, err := createMp(s3client, bucket, obj) + if err != nil { + return err + } + + const partCount = int64(5) + parts, _, err := uploadParts(s3client, partCount*5*1024*1024, partCount, bucket, obj, *out.UploadId) + if err != nil { + return err + } + + compParts := make([]types.CompletedPart, len(parts)) + for i, p := range parts { + compParts[i] = types.CompletedPart{ + ETag: p.ETag, + PartNumber: p.PartNumber, + } + } + + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + _, err = s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: &bucket, + Key: &obj, + UploadId: out.UploadId, + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: compParts, + }, + }) + cancel() + if err != nil { + return err + } + + // partNumber exceeds the number of parts in the completed upload + pn := int32(partCount + 1) + ctx, cancel = context.WithTimeout(context.Background(), shortTimeout) + _, err = s3client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: &bucket, + Key: &obj, + PartNumber: &pn, + }) + cancel() + return checkSdkApiErr(err, "RequestedRangeNotSatisfiable") + }) +} + +func HeadObject_mp_part_number_success(s *S3Conf) error { + testName := "HeadObject_mp_part_number_success" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + obj := "my-obj" + out, err := createMp(s3client, bucket, obj) + if err != nil { + return err + } + + const partCount = int64(3) + const partSize = int64(5 * 1024 * 1024) + const totalSize = partCount * partSize + + parts, _, err := uploadParts(s3client, totalSize, partCount, bucket, obj, *out.UploadId) + if err != nil { + return err + } + + compParts := make([]types.CompletedPart, len(parts)) + for i, p := range parts { + compParts[i] = types.CompletedPart{ + ETag: p.ETag, + PartNumber: p.PartNumber, + } + } + + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + _, err = s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: &bucket, + Key: &obj, + UploadId: out.UploadId, + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: compParts, + }, + }) + cancel() + if err != nil { + return err + } + + for i := range partCount { + pn := int32(i + 1) + startByte := i * partSize + endByte := startByte + partSize - 1 + expectedContentRange := fmt.Sprintf("bytes %d-%d/%d", startByte, endByte, totalSize) + + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + res, err := s3client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: &bucket, + Key: &obj, + PartNumber: &pn, + }) + cancel() + if err != nil { + return fmt.Errorf("part %d: %w", pn, err) + } + + if res.PartsCount == nil { + return fmt.Errorf("part %d: expected non-nil x-amz-mp-parts-count", pn) + } + if *res.PartsCount != int32(partCount) { + return fmt.Errorf("part %d: expected PartsCount %d, got %d", pn, partCount, *res.PartsCount) + } + if getString(res.ContentRange) != expectedContentRange { + return fmt.Errorf("part %d: expected Content-Range %q, got %q", pn, expectedContentRange, getString(res.ContentRange)) + } + if res.ContentLength == nil || *res.ContentLength != partSize { + return fmt.Errorf("part %d: expected Content-Length %d, got %v", pn, partSize, res.ContentLength) + } + if getString(res.AcceptRanges) != "bytes" { + return fmt.Errorf("part %d: expected Accept-Ranges 'bytes', got %q", pn, getString(res.AcceptRanges)) + } + } + + return nil + }) +} + +func HeadObject_non_mp_part_number_1_success(s *S3Conf) error { + testName := "HeadObject_non_mp_part_number_1_success" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + obj := "put-object-part1" + const objSize = int64(1234) + + _, err := putObjectWithData(objSize, &s3.PutObjectInput{ + Bucket: &bucket, + Key: &obj, + }, s3client) + if err != nil { + return err + } + + pn := int32(1) + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + res, err := s3client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: &bucket, + Key: &obj, + PartNumber: &pn, + }) + cancel() + if err != nil { + return err + } + + if res.ContentLength == nil || *res.ContentLength != objSize { + return fmt.Errorf("expected ContentLength %d, got %v", objSize, res.ContentLength) + } + if getString(res.ContentRange) != "" { + return fmt.Errorf("expected empty Content-Range for non-multipart object, got %q", getString(res.ContentRange)) + } + if res.PartsCount != nil { + return fmt.Errorf("expected nil PartsCount for non-multipart object, got %d", *res.PartsCount) + } + if getString(res.AcceptRanges) != "bytes" { + return fmt.Errorf("expected Accept-Ranges 'bytes', got %q", getString(res.AcceptRanges)) + } + + return nil + }) +} + func HeadObject_overrides_fail_public(s *S3Conf) error { testName := "HeadObject_overrides_fail_public" return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { diff --git a/tests/integration/group-tests.go b/tests/integration/group-tests.go index 97d4d385..f943719b 100644 --- a/tests/integration/group-tests.go +++ b/tests/integration/group-tests.go @@ -201,7 +201,6 @@ func TestPutObject(ts *TestState) { func TestHeadObject(ts *TestState) { ts.Run(HeadObject_non_existing_object) ts.Run(HeadObject_invalid_part_number) - ts.Run(HeadObject_part_number_not_supported) ts.Run(HeadObject_directory_object_noslash) ts.Run(HeadObject_non_existing_dir_object) ts.Run(HeadObject_invalid_parent_dir) @@ -218,6 +217,10 @@ func TestHeadObject(ts *TestState) { ts.Run(HeadObject_overrides_success) ts.Run(HeadObject_overrides_presign_success) ts.Run(HeadObject_overrides_fail_public) + ts.Run(HeadObject_range_and_part_number) + ts.Run(HeadObject_mp_part_number_exceeds_parts_count) + ts.Run(HeadObject_mp_part_number_success) + ts.Run(HeadObject_non_mp_part_number_1_success) } func TestGetObjectAttributes(ts *TestState) { @@ -256,7 +259,10 @@ func TestGetObject(ts *TestState) { ts.Run(GetObject_overrides_presign_success) ts.Run(GetObject_overrides_fail_public) ts.Run(GetObject_invalid_part_number) - ts.Run(GetObject_part_number_not_supported) + ts.Run(GetObject_range_and_part_number) + ts.Run(GetObject_mp_part_number_exceeds_parts_count) + ts.Run(GetObject_mp_part_number_success) + ts.Run(GetObject_non_mp_part_number_1_success) } func TestListObjects(ts *TestState) { @@ -537,6 +543,7 @@ func TestCompleteMultipartUpload(ts *TestState) { ts.Run(CompleteMultipartUpload_with_metadata) } ts.Run(CompleteMultipartUpload_success) + ts.Run(CompleteMultipartUpload_already_completed) if !ts.conf.azureTests { ts.Run(CompleteMultipartUpload_racey_success) ts.Run(CompleteMultipartUpload_racey_data_integrity) @@ -1383,7 +1390,6 @@ func GetIntTests() IntTests { "PutObject_racey_success": PutObject_racey_success, "HeadObject_non_existing_object": HeadObject_non_existing_object, "HeadObject_invalid_part_number": HeadObject_invalid_part_number, - "HeadObject_part_number_not_supported": HeadObject_part_number_not_supported, "HeadObject_directory_object_noslash": HeadObject_directory_object_noslash, "HeadObject_non_existing_dir_object": HeadObject_non_existing_dir_object, "HeadObject_name_too_long": HeadObject_name_too_long, @@ -1398,6 +1404,10 @@ func GetIntTests() IntTests { "HeadObject_overrides_success": HeadObject_overrides_success, "HeadObject_overrides_presign_success": HeadObject_overrides_presign_success, "HeadObject_overrides_fail_public": HeadObject_overrides_fail_public, + "HeadObject_range_and_part_number": HeadObject_range_and_part_number, + "HeadObject_mp_part_number_exceeds_parts_count": HeadObject_mp_part_number_exceeds_parts_count, + "HeadObject_mp_part_number_success": HeadObject_mp_part_number_success, + "HeadObject_non_mp_part_number_1_success": HeadObject_non_mp_part_number_1_success, "GetObjectAttributes_non_existing_bucket": GetObjectAttributes_non_existing_bucket, "GetObjectAttributes_non_existing_object": GetObjectAttributes_non_existing_object, "GetObjectAttributes_invalid_attrs": GetObjectAttributes_invalid_attrs, @@ -1424,7 +1434,10 @@ func GetIntTests() IntTests { "GetObject_overrides_presign_success": GetObject_overrides_presign_success, "GetObject_overrides_fail_public": GetObject_overrides_fail_public, "GetObject_invalid_part_number": GetObject_invalid_part_number, - "GetObject_part_number_not_supported": GetObject_part_number_not_supported, + "GetObject_range_and_part_number": GetObject_range_and_part_number, + "GetObject_mp_part_number_exceeds_parts_count": GetObject_mp_part_number_exceeds_parts_count, + "GetObject_mp_part_number_success": GetObject_mp_part_number_success, + "GetObject_non_mp_part_number_1_success": GetObject_non_mp_part_number_1_success, "ListObjects_non_existing_bucket": ListObjects_non_existing_bucket, "ListObjects_with_prefix": ListObjects_with_prefix, "ListObjects_truncated": ListObjects_truncated, @@ -1626,6 +1639,7 @@ func GetIntTests() IntTests { "CompleteMultipartUpload_should_ignore_the_final_checksum": CompleteMultipartUpload_should_ignore_the_final_checksum, "CompleteMultipartUpload_should_succeed_without_final_checksum_type": CompleteMultipartUpload_should_succeed_without_final_checksum_type, "CompleteMultipartUpload_success": CompleteMultipartUpload_success, + "CompleteMultipartUpload_already_completed": CompleteMultipartUpload_already_completed, "CompleteMultipartUpload_racey_success": CompleteMultipartUpload_racey_success, "CompleteMultipartUpload_racey_data_integrity": CompleteMultipartUpload_racey_data_integrity, "PutBucketAcl_non_existing_bucket": PutBucketAcl_non_existing_bucket, diff --git a/tests/integration/utils.go b/tests/integration/utils.go index 3f98e7dd..dd992453 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -2700,3 +2700,12 @@ func sendPostObject(input PostRequestConfig) (*http.Response, error) { return input.s3Conf.httpClient.Do(req) } + +func getEtagBytes(etag string) ([]byte, error) { + return hex.DecodeString(strings.ReplaceAll(etag, string('"'), "")) +} + +func md5String(data []byte) string { + sum := md5.Sum(data) + return hex.EncodeToString(sum[:]) +}