fix: make CompleteMultipartUpload idempotent and add part-number support to GetObject/HeadObject

Closes #1064

Use the multipart ETag as the in-progress directory suffix instead of the static `.inprogress` marker so that concurrent CompleteMultipartUpload calls for the same upload ID are all treated as successful (idempotent) rather than racing, where only one succeeded and the rest returned NoSuchUpload.

After finalizing the multipart upload, store an `mp-metadata` xattr on the assembled object that records the upload ID and cumulative byte offsets for each part. GetObject and HeadObject now use this metadata to serve individual part ranges via the `partNumber` query parameter, returning a successful response instead of returning NotImplemented.

Add two new S3 error codes:
- `ErrInvalidPartNumberRange` (416 RequestedRangeNotSatisfiable) — returned
  when the requested part number exceeds the number of parts in the upload.
- `ErrRangeAndPartNumber` (400 BadRequest) — returned when both a Range header
  and a partNumber query parameter are specified on the same request.
This commit is contained in:
niksis02
2026-04-16 23:25:48 +04:00
parent 6d3688adf9
commit 62e8cddbc7
13 changed files with 1041 additions and 232 deletions

View File

@@ -73,6 +73,10 @@ const (
// keyMpZeroBytesParts tracks zero-byte upload parts in the sgwtmp metadata.
// Azure StageBlock rejects Content-Length: 0, so zero-byte parts are stored here.
keyMpZeroBytesParts key = "Zerobytesparts"
// keyMpMetadata stores multipart upload part-offset metadata on the final
// committed blob so that GetObject/HeadObject can serve individual parts
// by part-number.
keyMpMetadata key = "Mpmetadata"
defaultListingMaxKeys = 1000
)
@@ -89,6 +93,7 @@ func (key) Table() map[string]struct{} {
"objectlegalhold": {},
"objname": {},
".sgwtmp/multipart": {},
"mpmetadata": {},
}
}
@@ -457,11 +462,6 @@ func (az *Azure) DeleteBucketTagging(ctx context.Context, bucket string) error {
}
func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
if input.PartNumber != nil {
// querying an object with part number is not supported
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
}
client, err := az.getBlobClient(*input.Bucket, *input.Key)
if err != nil {
return nil, err
@@ -486,8 +486,53 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.G
}
var opts *azblob.DownloadStreamOptions
if *input.Range != "" {
offset, count, isValid, err := backend.ParseObjectRange(*resp.ContentLength, *input.Range)
var partsCount *int32
var contentRange *string
if input.PartNumber != nil {
// Serve a specific part if the object has multipart upload metadata.
// For non-multipart objects (no mp-metadata), partNumber=1 returns the
// full object with no Content-Range; any other partNumber is out of range.
if mpMetaStr, ok := resp.Metadata[string(keyMpMetadata)]; ok && mpMetaStr != nil {
var mpMeta backend.MpUploadMetadata
if err := json.Unmarshal([]byte(*mpMetaStr), &mpMeta); err != nil {
return nil, fmt.Errorf("parse object multipart metadata: %w", err)
}
partNum := *input.PartNumber
totalParts := int32(len(mpMeta.Parts))
partsCount = &totalParts
if partNum > totalParts {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange)
}
var startOffset int64
if partNum > 1 {
startOffset = mpMeta.Parts[partNum-2]
}
length := mpMeta.Parts[partNum-1] - startOffset
var objSize int64
if resp.ContentLength != nil {
objSize = *resp.ContentLength
}
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %d-%d/%d", startOffset, startOffset+length-1, objSize))
opts = &azblob.DownloadStreamOptions{
Range: blob.HTTPRange{
Offset: startOffset,
Count: length,
},
}
} else if *input.PartNumber > 1 {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange)
}
// partNumber=1 on a non-multipart object: fall through and serve the
// full object without a range (opts remains nil, contentRange stays nil).
} else if *input.Range != "" {
var objSize int64
if resp.ContentLength != nil {
objSize = *resp.ContentLength
}
offset, count, isValid, err := backend.ParseObjectRange(objSize, *input.Range)
if err != nil {
return nil, err
}
@@ -498,8 +543,10 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.G
Offset: offset,
},
}
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %v-%v/%v", offset, offset+count-1, objSize))
}
}
blobDownloadResponse, err := az.client.DownloadStream(ctx, *input.Bucket, *input.Key, opts)
if err != nil {
return nil, azureErrToS3Err(err)
@@ -523,18 +570,14 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.G
LastModified: blobDownloadResponse.LastModified,
Metadata: parseAndFilterAzMetadata(blobDownloadResponse.Metadata),
TagCount: &tagcount,
ContentRange: blobDownloadResponse.ContentRange,
ContentRange: contentRange,
Body: blobDownloadResponse.Body,
StorageClass: types.StorageClassStandard,
PartsCount: partsCount,
}, nil
}
func (az *Azure) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) {
if input.PartNumber != nil {
// querying an object with part number is not supported
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
}
client, err := az.getBlobClient(*input.Bucket, *input.Key)
if err != nil {
return nil, err
@@ -563,21 +606,57 @@ func (az *Azure) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3
size = *resp.ContentLength
}
startOffset, length, isValid, err := backend.ParseObjectRange(size, getString(input.Range))
if err != nil {
return nil, err
}
var contentRange string
if isValid {
contentRange = fmt.Sprintf("bytes %v-%v/%v",
startOffset, startOffset+length-1, size)
var length int64
var partsCount *int32
if input.PartNumber != nil {
// Serve a specific part if the object has multipart upload metadata.
// For non-multipart objects (no mp-metadata), partNumber=1 returns the
// full object with no Content-Range; any other partNumber is out of range.
if mpMetaStr, ok := resp.Metadata[string(keyMpMetadata)]; ok && mpMetaStr != nil {
var mpMeta backend.MpUploadMetadata
if err := json.Unmarshal([]byte(*mpMetaStr), &mpMeta); err != nil {
return nil, fmt.Errorf("parse object multipart metadata: %w", err)
}
partNum := *input.PartNumber
totalParts := int32(len(mpMeta.Parts))
partsCount = &totalParts
if partNum > totalParts {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange)
}
var startOffset int64
if partNum > 1 {
startOffset = mpMeta.Parts[partNum-2]
}
length = mpMeta.Parts[partNum-1] - startOffset
contentRange = fmt.Sprintf("bytes %d-%d/%d", startOffset, startOffset+length-1, size)
} else if *input.PartNumber > 1 {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange)
} else {
// partNumber=1 on a non-multipart object: return full object size,
// no Content-Range, no PartsCount.
length = size
}
} else {
startOffset, lgth, isValid, err := backend.ParseObjectRange(size, getString(input.Range))
if err != nil {
return nil, err
}
length = lgth
if isValid {
contentRange = fmt.Sprintf("bytes %v-%v/%v",
startOffset, startOffset+length-1, size)
}
}
result := &s3.HeadObjectOutput{
ContentRange: &contentRange,
AcceptRanges: backend.GetPtrFromString("bytes"),
ContentLength: &length,
PartsCount: partsCount,
ContentType: resp.ContentType,
ContentEncoding: resp.ContentEncoding,
ContentLanguage: resp.ContentLanguage,
@@ -1598,7 +1677,29 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
props, err := blobClient.GetProperties(ctx, nil)
if err != nil {
return res, "", parseMpError(err)
mpErr := parseMpError(err)
// If the tmp blob is already gone, the upload may have already been
// completed. Check the final object's mp-metadata and return success
// if the upload IDs match.
if errors.Is(mpErr, s3err.GetAPIError(s3err.ErrNoSuchUpload)) {
finalClient, clientErr := az.getBlobClient(*input.Bucket, *input.Key)
if clientErr == nil {
finalProps, propErr := finalClient.GetProperties(ctx, nil)
if propErr == nil {
if mpMetaStr, ok := finalProps.Metadata[string(keyMpMetadata)]; ok && mpMetaStr != nil {
var mpMeta backend.MpUploadMetadata
if jsonErr := json.Unmarshal([]byte(*mpMetaStr), &mpMeta); jsonErr == nil && mpMeta.UploadID == *input.UploadId {
return s3response.CompleteMultipartUploadResult{
Bucket: input.Bucket,
Key: input.Key,
ETag: backend.GetPtrFromString(convertAzureEtag(finalProps.ETag)),
}, "", nil
}
}
}
}
}
return res, "", mpErr
}
tags, err := blobClient.GetTags(ctx, nil)
if err != nil {
@@ -1646,6 +1747,8 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
// The initial value is the lower limit of partNumber: 0
var totalSize int64
var partNumber int32
// partSizes[i] = cumulative byte offset after part i+1 (see backend.MpUploadMetadata)
var partSizes []int64
last := len(input.MultipartUpload.Parts) - 1
for i, part := range input.MultipartUpload.Parts {
if part.PartNumber == nil {
@@ -1672,6 +1775,7 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
return res, "", s3err.GetAPIError(s3err.ErrEntityTooSmall)
}
// Zero-byte parts contribute no data; skip adding to blockIds.
partSizes = append(partSizes, totalSize)
continue
}
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
@@ -1686,6 +1790,7 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
return res, "", s3err.GetAPIError(s3err.ErrEntityTooSmall)
}
totalSize += *block.Size
partSizes = append(partSizes, totalSize)
blockIds = append(blockIds, *block.Name)
}
@@ -1697,6 +1802,18 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
// Remove internal tracking keys from metadata before storing on the final blob.
delete(props.Metadata, string(keyMpZeroBytesParts))
// Serialize multipart metadata so GetObject/HeadObject can serve by part-number.
mpMeta := backend.MpUploadMetadata{UploadID: *input.UploadId, Parts: partSizes}
mpMetaJSON, err := json.Marshal(mpMeta)
if err != nil {
return res, "", fmt.Errorf("marshal mp metadata: %w", err)
}
mpMetaStr := string(mpMetaJSON)
if props.Metadata == nil {
props.Metadata = map[string]*string{}
}
props.Metadata[string(keyMpMetadata)] = &mpMetaStr
opts := &blockblob.CommitBlockListOptions{
Metadata: props.Metadata,
Tags: parseAzTags(tags.BlobTagSet),

View File

@@ -384,21 +384,21 @@ func isValidTagComponent(str string) bool {
return validTagComponent.Match([]byte(str))
}
func GetMultipartMD5(parts []types.CompletedPart) string {
func GetMultipartMD5(parts []types.CompletedPart) (string, error) {
var partsEtagBytes []byte
for _, part := range parts {
partsEtagBytes = append(partsEtagBytes, getEtagBytes(*part.ETag)...)
bts, err := getEtagBytes(*part.ETag)
if err != nil {
return "", fmt.Errorf("decode etag: %w", err)
}
partsEtagBytes = append(partsEtagBytes, bts...)
}
return fmt.Sprintf("\"%s-%d\"", md5String(partsEtagBytes), len(parts))
return fmt.Sprintf("\"%s-%d\"", md5String(partsEtagBytes), len(parts)), nil
}
func getEtagBytes(etag string) []byte {
decode, err := hex.DecodeString(strings.ReplaceAll(etag, string('"'), ""))
if err != nil {
return []byte(etag)
}
return decode
func getEtagBytes(etag string) ([]byte, error) {
return hex.DecodeString(strings.ReplaceAll(etag, string('"'), ""))
}
func md5String(data []byte) string {
@@ -406,6 +406,16 @@ func md5String(data []byte) string {
return hex.EncodeToString(sum[:])
}
// MpUploadMetadata is stored alongside the final object after a multipart
// upload completes. It records the uploadId and the cumulative byte offsets
// of each part: parts[i] = sum of sizes of parts 1..i+1. This allows O(1)
// total size lookup (parts[last]) and O(1) part-start/size derivation for
// GetObject/HeadObject part-number requests.
type MpUploadMetadata struct {
UploadID string `json:"uploadId"`
Parts []int64 `json:"parts"`
}
type FileSectionReadCloser struct {
R io.Reader
F *os.File

View File

@@ -136,6 +136,7 @@ const (
deleteMarkerKey = "delete-marker"
versionIdKey = "version-id"
partCrc64nvme = "part-crc64nvme"
mpMetaKey = "mp-metadata"
nullVersionId = "null"
@@ -1690,25 +1691,69 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
return res, "", fmt.Errorf("stat bucket: %w", err)
}
sum, err := p.checkUploadIDExists(bucket, object, uploadID)
if err != nil {
return res, "", err
}
// Atomically rename the upload directory to <uploadID>.inprogress so that
// concurrent CompleteMultipartUpload calls for the same upload ID see it as
// absent and return ErrNoSuchUpload rather than racing through the rest of
// the function. If this call does not succeed we rename it back (see the
// deferred cleanup below).
// Rename the upload directory to <uploadId><ETag> to atomically claim
// the processing slot. A concurrent call with the same uploadId will compute
// the same ETag, so it will either find the directory still present (still
// processing) or gone (already completed) and react accordingly.
sum := sha256.Sum256([]byte(object))
objdirFull := filepath.Join(bucket, MetaTmpMultipartDir, fmt.Sprintf("%x", sum))
uploadIDDir := filepath.Join(objdirFull, uploadID)
activeUploadName := uploadID + inProgressSuffix
// Calculate s3 compatible md5sum for complete multipart.
s3MD5, err := backend.GetMultipartMD5(parts)
if err != nil {
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
activeUploadName := fmt.Sprintf("%s.%s%s", uploadID, strings.Trim(s3MD5, "\""), inProgressSuffix)
uploadIDInProgress := filepath.Join(objdirFull, activeUploadName)
if err := os.Rename(uploadIDDir, uploadIDInProgress); err != nil {
if errors.Is(err, fs.ErrNotExist) {
return res, "", s3err.GetAPIError(s3err.ErrNoSuchUpload)
err = os.Rename(uploadIDDir, uploadIDInProgress)
if errors.Is(err, fs.ErrNotExist) {
// Another call already claimed this slot and is still assembling the object.
if _, statErr := os.Stat(uploadIDInProgress); statErr == nil {
// Still in progress — treat as success for idempotency.
return s3response.CompleteMultipartUploadResult{
Bucket: &bucket,
ETag: &s3MD5,
Key: &object,
}, "", nil
}
return res, "", fmt.Errorf("mark upload in-progress: %w", err)
// Directory is gone: the concurrent call already completed and cleaned up.
if _, statErr := os.Stat(filepath.Join(bucket, object)); statErr == nil {
return s3response.CompleteMultipartUploadResult{
Bucket: &bucket,
ETag: &s3MD5,
Key: &object,
}, "", nil
}
// Last resort: the object stat above may have lost a race with the
// concurrent call's link step. Check the mp-metadata xattr, as this
// multipart upload may have been finalized and the final object has been created
// before or by the racing request
if mpMetaBytes, statErr := p.meta.RetrieveAttribute(nil, bucket, object, mpMetaKey); statErr == nil {
var mpMeta backend.MpUploadMetadata
if err := json.Unmarshal(mpMetaBytes, &mpMeta); err != nil {
return res, "", fmt.Errorf("parse object multipart metadata: %w", err)
}
// The object may have been overwritten by a newer upload or
// it's the result of a completely different multipart upload; only
// treat it as our completion if the upload IDs match.
if mpMeta.UploadID != uploadID {
return res, "", s3err.GetAPIError(s3err.ErrNoSuchUpload)
}
return s3response.CompleteMultipartUploadResult{
Bucket: &bucket,
ETag: &s3MD5,
Key: &object,
}, "", nil
}
return res, "", s3err.GetAPIError(s3err.ErrNoSuchUpload)
}
if err != nil {
return res, "", fmt.Errorf("rename upload to etag dir: %w", err)
}
// Rename sidecar metadata to match the new data directory path.
@@ -1761,11 +1806,23 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
checksums.Algorithm = types.ChecksumAlgorithmCrc64nvme
}
// Initialize composite checksum reader
var compositeChecksumRdr *utils.CompositeChecksumReader
if checksums.Type == types.ChecksumTypeComposite {
compositeChecksumRdr, err = utils.NewCompositeChecksumReader(utils.HashType(strings.ToLower(string(checksums.Algorithm))))
if err != nil {
return res, "", fmt.Errorf("initialize composite checksum reader: %w", err)
}
}
// check all parts ok
last := len(parts) - 1
var totalsize int64
// cumulative byte offsets: partSizes[i] = sum of sizes of parts 1..i+1
var partSizes []int64
var composableCsum string
// The initialie values is the lower limit of partNumber: 0
// The initial value is the lower limit of partNumber: 0
var partNumber int32
for i, part := range parts {
if part.PartNumber == nil {
@@ -1788,6 +1845,7 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
}
totalsize += fi.Size()
partSizes = append(partSizes, totalsize)
// all parts except the last need to be greater, than or equal to
// the minimum allowed size (5 Mib)
if i < last && fi.Size() < backend.MinPartSize {
@@ -1813,18 +1871,86 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
if err != nil {
return res, "", err
}
// Accumulate checksum state.
switch checksums.Type {
case types.ChecksumTypeFullObject:
var pcs string
if mpChecksumType != "" {
pcs = getPartChecksum(checksums.Algorithm, part)
} else {
crc64nvme, err := p.meta.RetrieveAttribute(nil, bucket, partObjPath, partCrc64nvme)
if err != nil {
return res, "", fmt.Errorf("retrieve part internal crc64nvme: %w", err)
}
pcs = string(crc64nvme)
}
if i == 0 {
composableCsum = pcs
} else {
composableCsum, err = utils.AddCRCChecksum(checksums.Algorithm, composableCsum, pcs, fi.Size())
if err != nil {
return res, "", fmt.Errorf("add part %v checksum: %w", *part.PartNumber, err)
}
}
case types.ChecksumTypeComposite:
if err := compositeChecksumRdr.Process(getPartChecksum(checksums.Algorithm, part)); err != nil {
return res, "", fmt.Errorf("process %v part checksum: %w", *part.PartNumber, err)
}
}
}
if input.MpuObjectSize != nil && totalsize != *input.MpuObjectSize {
return res, "", s3err.GetIncorrectMpObjectSizeErr(totalsize, *input.MpuObjectSize)
}
var compositeChecksumRdr *utils.CompositeChecksumReader
if checksums.Type == types.ChecksumTypeComposite {
// initialize the composite checksum reader if the mp checksum type is COMPOSITE
compositeChecksumRdr, err = utils.NewCompositeChecksumReader(utils.HashType(strings.ToLower(string(checksums.Algorithm))))
if err != nil {
return res, "", fmt.Errorf("initialize composite checksum reader: %w", err)
// Compute the final checksum value.
var value string
switch checksums.Type {
case types.ChecksumTypeComposite:
value = fmt.Sprintf("%s-%v", compositeChecksumRdr.Sum(), len(parts))
case types.ChecksumTypeFullObject:
value = composableCsum
}
var crc32 *string
var crc32c *string
var sha1 *string
var sha256 *string
var crc64nvme *string
var gotSum *string
switch checksums.Algorithm {
case types.ChecksumAlgorithmCrc32:
gotSum = input.ChecksumCRC32
checksums.CRC32 = &value
crc32 = &value
case types.ChecksumAlgorithmCrc32c:
gotSum = input.ChecksumCRC32C
checksums.CRC32C = &value
crc32c = &value
case types.ChecksumAlgorithmSha1:
gotSum = input.ChecksumSHA1
checksums.SHA1 = &value
sha1 = &value
case types.ChecksumAlgorithmSha256:
gotSum = input.ChecksumSHA256
checksums.SHA256 = &value
sha256 = &value
case types.ChecksumAlgorithmCrc64nvme:
gotSum = input.ChecksumCRC64NVME
checksums.CRC64NVME = &value
crc64nvme = &value
}
// Check if the provided checksum and the calculated one are the same.
if mpChecksumType != "" && gotSum != nil {
s := *gotSum
if checksums.Type == types.ChecksumTypeComposite && !strings.Contains(s, "-") {
s = fmt.Sprintf("%s-%v", s, len(parts))
}
if s != value {
return res, "", s3err.GetChecksumBadDigestErr(checksums.Algorithm)
}
}
@@ -1838,56 +1964,14 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
}
defer f.cleanup()
var composableCsum string
var abortOnErrSet bool
for i, part := range parts {
for _, part := range parts {
partObjPath := filepath.Join(objdir, activeUploadName, fmt.Sprintf("%v", *part.PartNumber))
fullPartPath := filepath.Join(bucket, partObjPath)
pf, err := os.Open(fullPartPath)
if err != nil {
return res, "", fmt.Errorf("open part %v: %v", *part.PartNumber, err)
}
pfi, err := pf.Stat()
if err != nil {
pf.Close()
return res, "", fmt.Errorf("stat part %v: %v", *part.PartNumber, err)
}
switch checksums.Type {
case types.ChecksumTypeFullObject:
var partChecksum string
if mpChecksumType != "" {
// if any checksum has been initially specified on mp creation
// read the part checksum configuration
partChecksum = getPartChecksum(checksums.Algorithm, part)
} else {
// if no checksum has been specified on mp creation
// retrieve the internally stored crc64nvme
crc64nvme, err := p.meta.RetrieveAttribute(pf, bucket, partObjPath, partCrc64nvme)
if err != nil {
pf.Close()
return res, "", fmt.Errorf("retrieve part internal crc64nvme: %w", err)
}
partChecksum = string(crc64nvme)
}
if i == 0 {
composableCsum = partChecksum
break
}
composableCsum, err = utils.AddCRCChecksum(checksums.Algorithm, composableCsum, partChecksum, pfi.Size())
if err != nil {
pf.Close()
return res, "", fmt.Errorf("add part %v checksum: %w",
*part.PartNumber, err)
}
case types.ChecksumTypeComposite:
err := compositeChecksumRdr.Process(getPartChecksum(checksums.Algorithm, part))
if err != nil {
pf.Close()
return res, "", fmt.Errorf("process %v part checksum: %w",
*part.PartNumber, err)
}
}
if customCopy != nil {
idemp, err := customCopy(pf, f.File())
@@ -2004,59 +2088,6 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
}
}
var crc32 *string
var crc32c *string
var sha1 *string
var sha256 *string
var crc64nvme *string
var value string
switch checksums.Type {
case types.ChecksumTypeComposite:
value = fmt.Sprintf("%s-%v", compositeChecksumRdr.Sum(), len(parts))
case types.ChecksumTypeFullObject:
value = composableCsum
}
var gotSum *string
switch checksums.Algorithm {
case types.ChecksumAlgorithmCrc32:
gotSum = input.ChecksumCRC32
checksums.CRC32 = &value
crc32 = &value
case types.ChecksumAlgorithmCrc32c:
gotSum = input.ChecksumCRC32C
checksums.CRC32C = &value
crc32c = &value
case types.ChecksumAlgorithmSha1:
gotSum = input.ChecksumSHA1
checksums.SHA1 = &value
sha1 = &value
case types.ChecksumAlgorithmSha256:
gotSum = input.ChecksumSHA256
checksums.SHA256 = &value
sha256 = &value
case types.ChecksumAlgorithmCrc64nvme:
gotSum = input.ChecksumCRC64NVME
checksums.CRC64NVME = &value
crc64nvme = &value
}
// Check if the provided checksum and the calculated one are the same
if mpChecksumType != "" && gotSum != nil {
s := *gotSum
if checksums.Type == types.ChecksumTypeComposite && !strings.Contains(s, "-") {
// if number of parts is not specified in the final checksum
// make sure to add, to not fail in the final comparison
s = fmt.Sprintf("%s-%v", s, len(parts))
}
if s != value {
return res, "", s3err.GetChecksumBadDigestErr(checksums.Algorithm)
}
}
err = p.storeChecksums(f.File(), bucket, object, checksums)
if err != nil {
return res, "", fmt.Errorf("store object checksum: %w", err)
@@ -2074,14 +2105,24 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
}
}
// Calculate s3 compatible md5sum for complete multipart.
s3MD5 := backend.GetMultipartMD5(parts)
err = p.meta.StoreAttribute(f.File(), bucket, object, etagkey, []byte(s3MD5))
if err != nil {
return res, "", fmt.Errorf("set etag attr: %w", err)
}
// Store multipart upload metadata on the final object so that GetObject /
// HeadObject can serve individual parts by part-number.
mpMeta := backend.MpUploadMetadata{UploadID: uploadID, Parts: partSizes}
mpMetaJSON, err := json.Marshal(mpMeta)
if err != nil {
return res, "", fmt.Errorf("marshal object multipart metadata: %w", err)
}
err = p.meta.StoreAttribute(f.File(), bucket, object, mpMetaKey, mpMetaJSON)
if err != nil {
return res, "", fmt.Errorf("set object multipart metadata: %w", err)
}
err = f.link()
if err != nil {
return res, "", fmt.Errorf("link object in namespace: %w", err)
@@ -4182,11 +4223,6 @@ func (p *Posix) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.Ge
return nil, err
}
if input.PartNumber != nil {
// querying an object by part number is not supported
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
}
if !p.versioningEnabled() && versionId != "" {
//TODO: Maybe we need to return our custom error here?
return nil, s3err.GetAPIError(s3err.ErrInvalidVersionId)
@@ -4370,15 +4406,59 @@ func (p *Posix) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.Ge
}
objSize := fi.Size()
startOffset, length, isValid, err := backend.ParseObjectRange(objSize, *input.Range)
if err != nil {
return nil, err
if fi.IsDir() {
objSize = 0
}
var contentRange string
if isValid {
contentRange = fmt.Sprintf("bytes %v-%v/%v",
startOffset, startOffset+length-1, objSize)
var contentRange *string
var startOffset, length int64
var partsCount *int32
// If partNumber is requested and mp-metadata exists, serve that specific part.
// For non-multipart objects (no mp-metadata), partNumber=1 returns the full
// object with no Content-Range; any other partNumber is out of range.
// Both range read and partNumber can't be used together.
if input.PartNumber != nil {
mpMetaBytes, metaErr := p.meta.RetrieveAttribute(nil, bucket, object, mpMetaKey)
if metaErr == nil {
var mpMeta backend.MpUploadMetadata
if err := json.Unmarshal(mpMetaBytes, &mpMeta); err != nil {
return nil, fmt.Errorf("parse object multipart metadata: %w", err)
}
partNum := *input.PartNumber
totalParts := int32(len(mpMeta.Parts))
partsCount = &totalParts
if partNum > totalParts {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange)
}
// Parts holds cumulative sizes: Parts[i] = sum of sizes 1..i+1
if partNum > 1 {
startOffset = mpMeta.Parts[partNum-2]
}
length = mpMeta.Parts[partNum-1] - startOffset
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %d-%d/%d", startOffset, startOffset+length-1, objSize))
} else if errors.Is(metaErr, meta.ErrNoSuchKey) {
// Non-multipart object: partNumber=1 means the whole object; anything
// higher is out of range
if *input.PartNumber > 1 {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange)
}
length = objSize
} else {
return nil, fmt.Errorf("retrieve mp metadata: %w", metaErr)
}
} else {
start, lgth, isValid, err := backend.ParseObjectRange(objSize, getString(input.Range))
if err != nil {
return nil, err
}
startOffset, length = start, lgth
if isValid {
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %v-%v/%v", start, start+lgth-1, objSize))
}
}
objMeta := p.loadObjectMetaProperties(f, bucket, object, &fi)
@@ -4422,7 +4502,7 @@ func (p *Posix) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.Ge
LastModified: backend.GetTimePtr(fi.ModTime()),
Metadata: objMeta.Metadata,
TagCount: tagCount,
ContentRange: &contentRange,
ContentRange: contentRange,
StorageClass: types.StorageClassStandard,
VersionId: &versionId,
Body: body,
@@ -4432,6 +4512,7 @@ func (p *Posix) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.Ge
ChecksumSHA256: checksums.SHA256,
ChecksumCRC64NVME: checksums.CRC64NVME,
ChecksumType: checksums.Type,
PartsCount: partsCount,
}, nil
}
@@ -4451,11 +4532,6 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.
return nil, err
}
if input.PartNumber != nil {
// querying an object by part number is not supported
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
}
if !p.versioningEnabled() && versionId != "" {
//TODO: Maybe we need to return our custom error here?
return nil, s3err.GetAPIError(s3err.ErrInvalidVersionId)
@@ -4569,15 +4645,55 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.
size = 0
}
startOffset, length, isValid, err := backend.ParseObjectRange(size, getString(input.Range))
if err != nil {
return nil, err
}
var contentRange *string
var startOffset, length int64
var partsCount *int32
var contentRange string
if isValid {
contentRange = fmt.Sprintf("bytes %v-%v/%v",
startOffset, startOffset+length-1, size)
// If partNumber is requested and mp-metadata exists, serve that specific part.
// For non-multipart objects (no mp-metadata), partNumber=1 returns the full
// object with no Content-Range; any other partNumber is out of range.
// Both range read and partNumber can't be used together.
if input.PartNumber != nil {
mpMetaBytes, metaErr := p.meta.RetrieveAttribute(nil, bucket, object, mpMetaKey)
if metaErr == nil {
var mpMeta backend.MpUploadMetadata
if err := json.Unmarshal(mpMetaBytes, &mpMeta); err != nil {
return nil, fmt.Errorf("parse object multipart metadata: %w", err)
}
partNum := *input.PartNumber
totalParts := int32(len(mpMeta.Parts))
partsCount = &totalParts
if partNum > totalParts {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange)
}
// Parts holds cumulative sizes: Parts[i] = sum of sizes 1..i+1
if partNum > 1 {
startOffset = mpMeta.Parts[partNum-2]
}
length = mpMeta.Parts[partNum-1] - startOffset
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %d-%d/%d", startOffset, startOffset+length-1, size))
} else if errors.Is(metaErr, meta.ErrNoSuchKey) {
// Non-multipart object: partNumber=1 means the whole object; anything
// higher is out of range
if *input.PartNumber > 1 {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange)
}
length = size
} else {
return nil, fmt.Errorf("retrieve mp metadata: %w", metaErr)
}
} else {
start, lgth, isValid, err := backend.ParseObjectRange(size, getString(input.Range))
if err != nil {
return nil, err
}
startOffset, length = start, lgth
if isValid {
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %v-%v/%v", start, start+lgth-1, size))
}
}
var objectLockLegalHoldStatus types.ObjectLockLegalHoldStatus
@@ -4622,7 +4738,7 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.
return &s3.HeadObjectOutput{
ContentLength: &length,
AcceptRanges: backend.GetPtrFromString("bytes"),
ContentRange: &contentRange,
ContentRange: contentRange,
ContentType: objMeta.ContentType,
ContentEncoding: objMeta.ContentEncoding,
ContentDisposition: objMeta.ContentDisposition,
@@ -4644,6 +4760,7 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.
ChecksumCRC64NVME: checksums.CRC64NVME,
ChecksumType: checksums.Type,
TagCount: tagCount,
PartsCount: partsCount,
}, nil
}

View File

@@ -460,6 +460,15 @@ func (c S3ApiController) GetObject(ctx *fiber.Ctx) (*Response, error) {
}, s3err.GetAPIError(s3err.ErrInvalidPartNumber)
}
if acceptRange != "" {
debuglogger.Logf("Range and partNumber cannot both be specified")
return &Response{
MetaOpts: &MetaOptions{
BucketOwner: parsedAcl.Owner,
},
}, s3err.GetAPIError(s3err.ErrRangeAndPartNumber)
}
partNumber = &partNumberQuery
}

View File

@@ -722,6 +722,26 @@ func TestS3ApiController_GetObject(t *testing.T) {
err: s3err.GetAPIError(s3err.ErrInvalidPartNumber),
},
},
{
name: "both partNumber and Range",
input: testInput{
locals: defaultLocals,
queries: map[string]string{
"partNumber": "2",
},
headers: map[string]string{
"Range": "bytes=10-20",
},
},
output: testOutput{
response: &Response{
MetaOpts: &MetaOptions{
BucketOwner: "root",
},
},
err: s3err.GetAPIError(s3err.ErrRangeAndPartNumber),
},
},
{
name: "backend returns error",
input: testInput{

View File

@@ -107,6 +107,15 @@ func (c S3ApiController) HeadObject(ctx *fiber.Ctx) (*Response, error) {
}, s3err.GetAPIError(s3err.ErrInvalidPartNumber)
}
if objRange != "" {
debuglogger.Logf("Range and partNumber cannot both be specified")
return &Response{
MetaOpts: &MetaOptions{
BucketOwner: parsedAcl.Owner,
},
}, s3err.GetAPIError(s3err.ErrRangeAndPartNumber)
}
partNumber = &partNumberQuery
}

View File

@@ -98,6 +98,26 @@ func TestS3ApiController_HeadObject(t *testing.T) {
err: s3err.GetAPIError(s3err.ErrInvalidPartNumber),
},
},
{
name: "both partNumber and Range",
input: testInput{
locals: defaultLocals,
queries: map[string]string{
"partNumber": "6",
},
headers: map[string]string{
"Range": "bytes=1-3",
},
},
output: testOutput{
response: &Response{
MetaOpts: &MetaOptions{
BucketOwner: "root",
},
},
err: s3err.GetAPIError(s3err.ErrRangeAndPartNumber),
},
},
{
name: "invalid checksum mode",
input: testInput{

View File

@@ -81,6 +81,8 @@ const (
ErrInvalidObjectAttributes
ErrInvalidPart
ErrInvalidPartNumber
ErrInvalidPartNumberRange
ErrRangeAndPartNumber
ErrInvalidPartOrder
ErrInvalidCompleteMpPartNumber
ErrInternalError
@@ -339,6 +341,16 @@ var errorCodeResponse = map[ErrorCode]APIError{
Description: "Part number must be an integer between 1 and 10000, inclusive.",
HTTPStatusCode: http.StatusBadRequest,
},
ErrInvalidPartNumberRange: {
Code: "InvalidPartNumber",
Description: "The requested partnumber is not satisfiable.",
HTTPStatusCode: http.StatusRequestedRangeNotSatisfiable,
},
ErrRangeAndPartNumber: {
Code: "InvalidRequest",
Description: "Cannot specify both Range header and partNumber query parameter",
HTTPStatusCode: http.StatusBadRequest,
},
ErrInvalidPartOrder: {
Code: "InvalidPartOrder",
Description: "The list of parts was not in ascending order. Parts must be ordered by part number.",

View File

@@ -1907,13 +1907,77 @@ func CompleteMultipartUpload_racey_success(s *S3Conf) error {
})
}
func CompleteMultipartUpload_already_completed(s *S3Conf) error {
testName := "CompleteMultipartUpload_already_completed"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "my-obj"
out, err := createMp(s3client, bucket, obj)
if err != nil {
return err
}
parts, _, err := uploadParts(s3client, 5*1024*1024, 1, bucket, obj, *out.UploadId)
if err != nil {
return err
}
compParts := []types.CompletedPart{}
for _, el := range parts {
compParts = append(compParts, types.CompletedPart{
ETag: el.ETag,
PartNumber: el.PartNumber,
})
}
completeInput := &s3.CompleteMultipartUploadInput{
Bucket: &bucket,
Key: &obj,
UploadId: out.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: compParts,
},
}
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
first, err := s3client.CompleteMultipartUpload(ctx, completeInput)
cancel()
if err != nil {
return err
}
// Second call with the same upload ID should also succeed (idempotent).
ctx, cancel = context.WithTimeout(context.Background(), shortTimeout)
second, err := s3client.CompleteMultipartUpload(ctx, completeInput)
cancel()
if err != nil {
return err
}
if getString(second.Bucket) != bucket {
return fmt.Errorf("expected Bucket to be %s, instead got %s", bucket, getString(second.Bucket))
}
if getString(second.Key) != obj {
return fmt.Errorf("expected Key to be %s, instead got %s", obj, getString(second.Key))
}
if getString(second.ETag) != getString(first.ETag) {
return fmt.Errorf("expected ETag to be %s, instead got %s", getString(first.ETag), getString(second.ETag))
}
location := constructObjectLocation(s.endpoint, bucket, obj, s.hostStyle)
if getString(second.Location) != location {
return fmt.Errorf("expected Location to be %s, instead got %s", location, getString(second.Location))
}
return nil
})
}
// CompleteMultipartUpload_racey_data_integrity creates a single multipart
// upload, uploads its parts, then fires multiple concurrent
// CompleteMultipartUpload calls for the exact same upload ID. Exactly one
// call must succeed; the rest must fail with NoSuchUpload (the upload was
// already consumed). The surviving object must contain exactly the data that
// was uploaded. The whole sequence is repeated several times so that
// intermittent scheduling cannot hide a data-corruption bug.
// CompleteMultipartUpload calls for the exact same upload ID. All calls must
// succeed and return the same ETag (idempotent completion). The surviving
// object must contain exactly the data that was uploaded. The whole sequence
// is repeated several times so that intermittent scheduling cannot hide a
// data-corruption bug.
func CompleteMultipartUpload_racey_data_integrity(s *S3Conf) error {
testName := "CompleteMultipartUpload_racey_data_integrity"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
@@ -1924,7 +1988,6 @@ func CompleteMultipartUpload_racey_data_integrity(s *S3Conf) error {
)
obj := "my-obj"
objSize := int64(15 * 1024 * 1024) // 3 × 5 MiB parts
noSuchUpload := s3err.GetAPIError(s3err.ErrNoSuchUpload)
for iter := range iterations {
// Phase 1: create one upload and upload its parts.
@@ -1938,25 +2001,26 @@ func CompleteMultipartUpload_racey_data_integrity(s *S3Conf) error {
return fmt.Errorf("iteration %d: upload parts: %w", iter, err)
}
var partsEtagBytes []byte
compParts := make([]types.CompletedPart, 0, len(parts))
for _, el := range parts {
b, err := getEtagBytes(*el.ETag)
if err != nil {
return fmt.Errorf("iteration %d: %w", iter, err)
}
partsEtagBytes = append(partsEtagBytes, b...)
compParts = append(compParts, types.CompletedPart{
ETag: el.ETag,
PartNumber: el.PartNumber,
})
}
expectedETag := fmt.Sprintf("\"%s-%d\"", md5String(partsEtagBytes), len(parts))
// Phase 2: race concurrency complete calls for the same upload ID.
// Exactly one must succeed; the others must return NoSuchUpload.
var (
mu sync.Mutex
successCnt int
)
eg := errgroup.Group{}
for range concurrency {
eg.Go(func() error {
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err := s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
res, err := s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: &bucket,
Key: &obj,
UploadId: out.UploadId,
@@ -1965,23 +2029,20 @@ func CompleteMultipartUpload_racey_data_integrity(s *S3Conf) error {
},
})
cancel()
if err == nil {
mu.Lock()
successCnt++
mu.Unlock()
return nil
if err != nil {
return err
}
// Every non-zero error must be NoSuchUpload.
return checkApiErr(err, noSuchUpload)
if getString(res.ETag) != expectedETag {
return fmt.Errorf("expected the multipart upload ETag to be %s, instead got %s", expectedETag, getString(res.ETag))
}
return nil
})
}
if err := eg.Wait(); err != nil {
return fmt.Errorf("iteration %d: complete phase: %w", iter, err)
}
if successCnt != 1 {
return fmt.Errorf("iteration %d: expected exactly 1 successful complete, got %d",
iter, successCnt)
}
// Phase 3: download the object and verify it is complete and
// uncorrupted — its checksum must match what was uploaded.

View File

@@ -15,6 +15,7 @@
package integration
import (
"bytes"
"context"
"crypto/sha256"
"errors"
@@ -1256,17 +1257,246 @@ func GetObject_invalid_part_number(s *S3Conf) error {
})
}
func GetObject_part_number_not_supported(s *S3Conf) error {
testName := "GetObject_part_number_not_supported"
func GetObject_range_and_part_number(s *S3Conf) error {
testName := "GetObject_range_and_part_number"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
defer cancel()
_, err := s3client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: getPtr("obj"),
PartNumber: getPtr(int32(3)),
})
obj := "my-obj"
_, err := putObjectWithData(100, &s3.PutObjectInput{
Bucket: &bucket,
Key: &obj,
}, s3client)
if err != nil {
return err
}
return checkApiErr(err, s3err.GetAPIError(s3err.ErrNotImplemented))
pn := int32(1)
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err = s3client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &obj,
Range: getPtr("bytes=0-9"),
PartNumber: &pn,
})
cancel()
return checkApiErr(err, s3err.GetAPIError(s3err.ErrRangeAndPartNumber))
})
}
func GetObject_mp_part_number_exceeds_parts_count(s *S3Conf) error {
testName := "GetObject_mp_part_number_exceeds_parts_count"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "my-obj"
out, err := createMp(s3client, bucket, obj)
if err != nil {
return err
}
const partCount = int64(5)
parts, _, err := uploadParts(s3client, partCount*5*1024*1024, partCount, bucket, obj, *out.UploadId)
if err != nil {
return err
}
compParts := make([]types.CompletedPart, len(parts))
for i, p := range parts {
compParts[i] = types.CompletedPart{
ETag: p.ETag,
PartNumber: p.PartNumber,
}
}
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err = s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: &bucket,
Key: &obj,
UploadId: out.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: compParts,
},
})
cancel()
if err != nil {
return err
}
// partNumber exceeds the number of parts in the completed upload
pn := int32(partCount + 1)
ctx, cancel = context.WithTimeout(context.Background(), shortTimeout)
_, err = s3client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &obj,
PartNumber: &pn,
})
cancel()
return checkApiErr(err, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange))
})
}
func GetObject_mp_part_number_success(s *S3Conf) error {
testName := "GetObject_mp_part_number_success"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "my-obj"
out, err := createMp(s3client, bucket, obj)
if err != nil {
return err
}
const partCount = 3
const partSize = int64(5 * 1024 * 1024)
const totalSize = int64(partCount) * partSize
// Upload parts manually to capture per-part data for integrity checks
partNumbers := make([]int32, partCount)
partChecksums := make([][32]byte, partCount)
compParts := make([]types.CompletedPart, partCount)
for i := range partCount {
partNumbers[i] = int32(i + 1)
buf := make([]byte, partSize)
for j := range buf {
buf[j] = byte(i*17 + j%251)
}
partChecksums[i] = sha256.Sum256(buf)
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
res, err := s3client.UploadPart(ctx, &s3.UploadPartInput{
Bucket: &bucket,
Key: &obj,
UploadId: out.UploadId,
Body: bytes.NewReader(buf),
PartNumber: &partNumbers[i],
})
cancel()
if err != nil {
return fmt.Errorf("upload part %d: %w", partNumbers[i], err)
}
compParts[i] = types.CompletedPart{
ETag: res.ETag,
PartNumber: &partNumbers[i],
}
}
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err = s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: &bucket,
Key: &obj,
UploadId: out.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: compParts,
},
})
cancel()
if err != nil {
return err
}
for i := range partCount {
pn := int32(i + 1)
startByte := int64(i) * partSize
endByte := startByte + partSize - 1
expectedContentRange := fmt.Sprintf("bytes %d-%d/%d", startByte, endByte, totalSize)
ctx, cancel := context.WithTimeout(context.Background(), longTimeout)
res, err := s3client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &obj,
PartNumber: &pn,
})
if err != nil {
cancel()
return fmt.Errorf("part %d: %w", pn, err)
}
if res.PartsCount == nil {
cancel()
return fmt.Errorf("part %d: expected non-nil x-amz-mp-parts-count", pn)
}
if *res.PartsCount != int32(partCount) {
cancel()
return fmt.Errorf("part %d: expected PartsCount %d, got %d", pn, partCount, *res.PartsCount)
}
if getString(res.ContentRange) != expectedContentRange {
cancel()
return fmt.Errorf("part %d: expected Content-Range %q, got %q", pn, expectedContentRange, getString(res.ContentRange))
}
if res.ContentLength == nil || *res.ContentLength != partSize {
cancel()
return fmt.Errorf("part %d: expected Content-Length %d, got %v", pn, partSize, res.ContentLength)
}
if getString(res.AcceptRanges) != "bytes" {
cancel()
return fmt.Errorf("part %d: expected Accept-Ranges 'bytes', got %q", pn, getString(res.AcceptRanges))
}
body, readErr := io.ReadAll(res.Body)
cancel()
res.Body.Close()
if readErr != nil {
return fmt.Errorf("part %d: read body: %w", pn, readErr)
}
gotSum := sha256.Sum256(body)
if gotSum != partChecksums[i] {
return fmt.Errorf("part %d: data integrity check failed: body checksum mismatch", pn)
}
}
return nil
})
}
func GetObject_non_mp_part_number_1_success(s *S3Conf) error {
testName := "GetObject_non_mp_part_number_1_success"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "put-object-part1"
const objSize = int64(1234)
out, err := putObjectWithData(objSize, &s3.PutObjectInput{
Bucket: &bucket,
Key: &obj,
}, s3client)
if err != nil {
return err
}
pn := int32(1)
ctx, cancel := context.WithTimeout(context.Background(), longTimeout)
res, err := s3client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &obj,
PartNumber: &pn,
})
if err != nil {
cancel()
return err
}
if res.ContentLength == nil || *res.ContentLength != objSize {
cancel()
return fmt.Errorf("expected ContentLength %d, got %v", objSize, res.ContentLength)
}
if getString(res.ContentRange) != "" {
cancel()
return fmt.Errorf("expected empty Content-Range for non-multipart object, got %q", getString(res.ContentRange))
}
if res.PartsCount != nil {
cancel()
return fmt.Errorf("expected nil PartsCount for non-multipart object, got %d", *res.PartsCount)
}
if getString(res.AcceptRanges) != "bytes" {
cancel()
return fmt.Errorf("expected Accept-Ranges 'bytes', got %q", getString(res.AcceptRanges))
}
body, readErr := io.ReadAll(res.Body)
cancel()
res.Body.Close()
if readErr != nil {
return fmt.Errorf("read body: %w", readErr)
}
if gotSum := sha256.Sum256(body); gotSum != out.csum {
return fmt.Errorf("data integrity check failed: body checksum mismatch")
}
return nil
})
}

View File

@@ -62,21 +62,6 @@ func HeadObject_invalid_part_number(s *S3Conf) error {
})
}
func HeadObject_part_number_not_supported(s *S3Conf) error {
testName := "HeadObject_part_number_not_supported"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
partNumber := int32(4)
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err := s3client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: &bucket,
Key: getPtr("my-obj"),
PartNumber: &partNumber,
})
cancel()
return checkSdkApiErr(err, "NotImplemented")
})
}
func HeadObject_non_existing_dir_object(s *S3Conf) error {
testName := "HeadObject_non_existing_dir_object"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
@@ -930,6 +915,202 @@ func HeadObject_overrides_presign_success(s *S3Conf) error {
})
}
func HeadObject_range_and_part_number(s *S3Conf) error {
testName := "HeadObject_range_and_part_number"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "my-obj"
_, err := putObjectWithData(100, &s3.PutObjectInput{
Bucket: &bucket,
Key: &obj,
}, s3client)
if err != nil {
return err
}
pn := int32(1)
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err = s3client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: &bucket,
Key: &obj,
Range: getPtr("bytes=0-9"),
PartNumber: &pn,
})
cancel()
return checkSdkApiErr(err, "BadRequest")
})
}
func HeadObject_mp_part_number_exceeds_parts_count(s *S3Conf) error {
testName := "HeadObject_mp_part_number_exceeds_parts_count"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "my-obj"
out, err := createMp(s3client, bucket, obj)
if err != nil {
return err
}
const partCount = int64(5)
parts, _, err := uploadParts(s3client, partCount*5*1024*1024, partCount, bucket, obj, *out.UploadId)
if err != nil {
return err
}
compParts := make([]types.CompletedPart, len(parts))
for i, p := range parts {
compParts[i] = types.CompletedPart{
ETag: p.ETag,
PartNumber: p.PartNumber,
}
}
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err = s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: &bucket,
Key: &obj,
UploadId: out.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: compParts,
},
})
cancel()
if err != nil {
return err
}
// partNumber exceeds the number of parts in the completed upload
pn := int32(partCount + 1)
ctx, cancel = context.WithTimeout(context.Background(), shortTimeout)
_, err = s3client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: &bucket,
Key: &obj,
PartNumber: &pn,
})
cancel()
return checkSdkApiErr(err, "RequestedRangeNotSatisfiable")
})
}
func HeadObject_mp_part_number_success(s *S3Conf) error {
testName := "HeadObject_mp_part_number_success"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "my-obj"
out, err := createMp(s3client, bucket, obj)
if err != nil {
return err
}
const partCount = int64(3)
const partSize = int64(5 * 1024 * 1024)
const totalSize = partCount * partSize
parts, _, err := uploadParts(s3client, totalSize, partCount, bucket, obj, *out.UploadId)
if err != nil {
return err
}
compParts := make([]types.CompletedPart, len(parts))
for i, p := range parts {
compParts[i] = types.CompletedPart{
ETag: p.ETag,
PartNumber: p.PartNumber,
}
}
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err = s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: &bucket,
Key: &obj,
UploadId: out.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: compParts,
},
})
cancel()
if err != nil {
return err
}
for i := range partCount {
pn := int32(i + 1)
startByte := i * partSize
endByte := startByte + partSize - 1
expectedContentRange := fmt.Sprintf("bytes %d-%d/%d", startByte, endByte, totalSize)
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
res, err := s3client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: &bucket,
Key: &obj,
PartNumber: &pn,
})
cancel()
if err != nil {
return fmt.Errorf("part %d: %w", pn, err)
}
if res.PartsCount == nil {
return fmt.Errorf("part %d: expected non-nil x-amz-mp-parts-count", pn)
}
if *res.PartsCount != int32(partCount) {
return fmt.Errorf("part %d: expected PartsCount %d, got %d", pn, partCount, *res.PartsCount)
}
if getString(res.ContentRange) != expectedContentRange {
return fmt.Errorf("part %d: expected Content-Range %q, got %q", pn, expectedContentRange, getString(res.ContentRange))
}
if res.ContentLength == nil || *res.ContentLength != partSize {
return fmt.Errorf("part %d: expected Content-Length %d, got %v", pn, partSize, res.ContentLength)
}
if getString(res.AcceptRanges) != "bytes" {
return fmt.Errorf("part %d: expected Accept-Ranges 'bytes', got %q", pn, getString(res.AcceptRanges))
}
}
return nil
})
}
func HeadObject_non_mp_part_number_1_success(s *S3Conf) error {
testName := "HeadObject_non_mp_part_number_1_success"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "put-object-part1"
const objSize = int64(1234)
_, err := putObjectWithData(objSize, &s3.PutObjectInput{
Bucket: &bucket,
Key: &obj,
}, s3client)
if err != nil {
return err
}
pn := int32(1)
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
res, err := s3client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: &bucket,
Key: &obj,
PartNumber: &pn,
})
cancel()
if err != nil {
return err
}
if res.ContentLength == nil || *res.ContentLength != objSize {
return fmt.Errorf("expected ContentLength %d, got %v", objSize, res.ContentLength)
}
if getString(res.ContentRange) != "" {
return fmt.Errorf("expected empty Content-Range for non-multipart object, got %q", getString(res.ContentRange))
}
if res.PartsCount != nil {
return fmt.Errorf("expected nil PartsCount for non-multipart object, got %d", *res.PartsCount)
}
if getString(res.AcceptRanges) != "bytes" {
return fmt.Errorf("expected Accept-Ranges 'bytes', got %q", getString(res.AcceptRanges))
}
return nil
})
}
func HeadObject_overrides_fail_public(s *S3Conf) error {
testName := "HeadObject_overrides_fail_public"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {

View File

@@ -201,7 +201,6 @@ func TestPutObject(ts *TestState) {
func TestHeadObject(ts *TestState) {
ts.Run(HeadObject_non_existing_object)
ts.Run(HeadObject_invalid_part_number)
ts.Run(HeadObject_part_number_not_supported)
ts.Run(HeadObject_directory_object_noslash)
ts.Run(HeadObject_non_existing_dir_object)
ts.Run(HeadObject_invalid_parent_dir)
@@ -218,6 +217,10 @@ func TestHeadObject(ts *TestState) {
ts.Run(HeadObject_overrides_success)
ts.Run(HeadObject_overrides_presign_success)
ts.Run(HeadObject_overrides_fail_public)
ts.Run(HeadObject_range_and_part_number)
ts.Run(HeadObject_mp_part_number_exceeds_parts_count)
ts.Run(HeadObject_mp_part_number_success)
ts.Run(HeadObject_non_mp_part_number_1_success)
}
func TestGetObjectAttributes(ts *TestState) {
@@ -256,7 +259,10 @@ func TestGetObject(ts *TestState) {
ts.Run(GetObject_overrides_presign_success)
ts.Run(GetObject_overrides_fail_public)
ts.Run(GetObject_invalid_part_number)
ts.Run(GetObject_part_number_not_supported)
ts.Run(GetObject_range_and_part_number)
ts.Run(GetObject_mp_part_number_exceeds_parts_count)
ts.Run(GetObject_mp_part_number_success)
ts.Run(GetObject_non_mp_part_number_1_success)
}
func TestListObjects(ts *TestState) {
@@ -532,6 +538,7 @@ func TestCompleteMultipartUpload(ts *TestState) {
ts.Run(CompleteMultipartUpload_with_metadata)
}
ts.Run(CompleteMultipartUpload_success)
ts.Run(CompleteMultipartUpload_already_completed)
if !ts.conf.azureTests {
ts.Run(CompleteMultipartUpload_racey_success)
ts.Run(CompleteMultipartUpload_racey_data_integrity)
@@ -1378,7 +1385,6 @@ func GetIntTests() IntTests {
"PutObject_racey_success": PutObject_racey_success,
"HeadObject_non_existing_object": HeadObject_non_existing_object,
"HeadObject_invalid_part_number": HeadObject_invalid_part_number,
"HeadObject_part_number_not_supported": HeadObject_part_number_not_supported,
"HeadObject_directory_object_noslash": HeadObject_directory_object_noslash,
"HeadObject_non_existing_dir_object": HeadObject_non_existing_dir_object,
"HeadObject_name_too_long": HeadObject_name_too_long,
@@ -1393,6 +1399,10 @@ func GetIntTests() IntTests {
"HeadObject_overrides_success": HeadObject_overrides_success,
"HeadObject_overrides_presign_success": HeadObject_overrides_presign_success,
"HeadObject_overrides_fail_public": HeadObject_overrides_fail_public,
"HeadObject_range_and_part_number": HeadObject_range_and_part_number,
"HeadObject_mp_part_number_exceeds_parts_count": HeadObject_mp_part_number_exceeds_parts_count,
"HeadObject_mp_part_number_success": HeadObject_mp_part_number_success,
"HeadObject_non_mp_part_number_1_success": HeadObject_non_mp_part_number_1_success,
"GetObjectAttributes_non_existing_bucket": GetObjectAttributes_non_existing_bucket,
"GetObjectAttributes_non_existing_object": GetObjectAttributes_non_existing_object,
"GetObjectAttributes_invalid_attrs": GetObjectAttributes_invalid_attrs,
@@ -1419,7 +1429,10 @@ func GetIntTests() IntTests {
"GetObject_overrides_presign_success": GetObject_overrides_presign_success,
"GetObject_overrides_fail_public": GetObject_overrides_fail_public,
"GetObject_invalid_part_number": GetObject_invalid_part_number,
"GetObject_part_number_not_supported": GetObject_part_number_not_supported,
"GetObject_range_and_part_number": GetObject_range_and_part_number,
"GetObject_mp_part_number_exceeds_parts_count": GetObject_mp_part_number_exceeds_parts_count,
"GetObject_mp_part_number_success": GetObject_mp_part_number_success,
"GetObject_non_mp_part_number_1_success": GetObject_non_mp_part_number_1_success,
"ListObjects_non_existing_bucket": ListObjects_non_existing_bucket,
"ListObjects_with_prefix": ListObjects_with_prefix,
"ListObjects_truncated": ListObjects_truncated,
@@ -1612,6 +1625,7 @@ func GetIntTests() IntTests {
"CompleteMultipartUpload_should_ignore_the_final_checksum": CompleteMultipartUpload_should_ignore_the_final_checksum,
"CompleteMultipartUpload_should_succeed_without_final_checksum_type": CompleteMultipartUpload_should_succeed_without_final_checksum_type,
"CompleteMultipartUpload_success": CompleteMultipartUpload_success,
"CompleteMultipartUpload_already_completed": CompleteMultipartUpload_already_completed,
"CompleteMultipartUpload_racey_success": CompleteMultipartUpload_racey_success,
"CompleteMultipartUpload_racey_data_integrity": CompleteMultipartUpload_racey_data_integrity,
"PutBucketAcl_non_existing_bucket": PutBucketAcl_non_existing_bucket,

View File

@@ -2700,3 +2700,12 @@ func sendPostObject(input PostRequestConfig) (*http.Response, error) {
return input.s3Conf.httpClient.Do(req)
}
func getEtagBytes(etag string) ([]byte, error) {
return hex.DecodeString(strings.ReplaceAll(etag, string('"'), ""))
}
func md5String(data []byte) string {
sum := md5.Sum(data)
return hex.EncodeToString(sum[:])
}