Merge pull request #2054 from versity/sis/racing-complete-mp

fix: make CompleteMultipartUpload idempotent and add part-number support to GetObject/HeadObject
This commit is contained in:
Ben McClelland
2026-04-20 10:23:09 -07:00
committed by GitHub
13 changed files with 1041 additions and 232 deletions

View File

@@ -73,6 +73,10 @@ const (
// keyMpZeroBytesParts tracks zero-byte upload parts in the sgwtmp metadata.
// Azure StageBlock rejects Content-Length: 0, so zero-byte parts are stored here.
keyMpZeroBytesParts key = "Zerobytesparts"
// keyMpMetadata stores multipart upload part-offset metadata on the final
// committed blob so that GetObject/HeadObject can serve individual parts
// by part-number.
keyMpMetadata key = "Mpmetadata"
defaultListingMaxKeys = 1000
)
@@ -89,6 +93,7 @@ func (key) Table() map[string]struct{} {
"objectlegalhold": {},
"objname": {},
".sgwtmp/multipart": {},
"mpmetadata": {},
}
}
@@ -463,11 +468,6 @@ func (az *Azure) DeleteBucketTagging(ctx context.Context, bucket string) error {
}
func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
if input.PartNumber != nil {
// querying an object with part number is not supported
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
}
client, err := az.getBlobClient(*input.Bucket, *input.Key)
if err != nil {
return nil, err
@@ -492,8 +492,53 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.G
}
var opts *azblob.DownloadStreamOptions
if *input.Range != "" {
offset, count, isValid, err := backend.ParseObjectRange(*resp.ContentLength, *input.Range)
var partsCount *int32
var contentRange *string
if input.PartNumber != nil {
// Serve a specific part if the object has multipart upload metadata.
// For non-multipart objects (no mp-metadata), partNumber=1 returns the
// full object with no Content-Range; any other partNumber is out of range.
if mpMetaStr, ok := resp.Metadata[string(keyMpMetadata)]; ok && mpMetaStr != nil {
var mpMeta backend.MpUploadMetadata
if err := json.Unmarshal([]byte(*mpMetaStr), &mpMeta); err != nil {
return nil, fmt.Errorf("parse object multipart metadata: %w", err)
}
partNum := *input.PartNumber
totalParts := int32(len(mpMeta.Parts))
partsCount = &totalParts
if partNum > totalParts {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange)
}
var startOffset int64
if partNum > 1 {
startOffset = mpMeta.Parts[partNum-2]
}
length := mpMeta.Parts[partNum-1] - startOffset
var objSize int64
if resp.ContentLength != nil {
objSize = *resp.ContentLength
}
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %d-%d/%d", startOffset, startOffset+length-1, objSize))
opts = &azblob.DownloadStreamOptions{
Range: blob.HTTPRange{
Offset: startOffset,
Count: length,
},
}
} else if *input.PartNumber > 1 {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange)
}
// partNumber=1 on a non-multipart object: fall through and serve the
// full object without a range (opts remains nil, contentRange stays nil).
} else if *input.Range != "" {
var objSize int64
if resp.ContentLength != nil {
objSize = *resp.ContentLength
}
offset, count, isValid, err := backend.ParseObjectRange(objSize, *input.Range)
if err != nil {
return nil, err
}
@@ -504,8 +549,10 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.G
Offset: offset,
},
}
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %v-%v/%v", offset, offset+count-1, objSize))
}
}
blobDownloadResponse, err := az.client.DownloadStream(ctx, *input.Bucket, *input.Key, opts)
if err != nil {
return nil, azureErrToS3Err(err)
@@ -529,18 +576,14 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.G
LastModified: blobDownloadResponse.LastModified,
Metadata: parseAndFilterAzMetadata(blobDownloadResponse.Metadata),
TagCount: &tagcount,
ContentRange: blobDownloadResponse.ContentRange,
ContentRange: contentRange,
Body: blobDownloadResponse.Body,
StorageClass: types.StorageClassStandard,
PartsCount: partsCount,
}, nil
}
func (az *Azure) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) {
if input.PartNumber != nil {
// querying an object with part number is not supported
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
}
client, err := az.getBlobClient(*input.Bucket, *input.Key)
if err != nil {
return nil, err
@@ -569,21 +612,57 @@ func (az *Azure) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3
size = *resp.ContentLength
}
startOffset, length, isValid, err := backend.ParseObjectRange(size, getString(input.Range))
if err != nil {
return nil, err
}
var contentRange string
if isValid {
contentRange = fmt.Sprintf("bytes %v-%v/%v",
startOffset, startOffset+length-1, size)
var length int64
var partsCount *int32
if input.PartNumber != nil {
// Serve a specific part if the object has multipart upload metadata.
// For non-multipart objects (no mp-metadata), partNumber=1 returns the
// full object with no Content-Range; any other partNumber is out of range.
if mpMetaStr, ok := resp.Metadata[string(keyMpMetadata)]; ok && mpMetaStr != nil {
var mpMeta backend.MpUploadMetadata
if err := json.Unmarshal([]byte(*mpMetaStr), &mpMeta); err != nil {
return nil, fmt.Errorf("parse object multipart metadata: %w", err)
}
partNum := *input.PartNumber
totalParts := int32(len(mpMeta.Parts))
partsCount = &totalParts
if partNum > totalParts {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange)
}
var startOffset int64
if partNum > 1 {
startOffset = mpMeta.Parts[partNum-2]
}
length = mpMeta.Parts[partNum-1] - startOffset
contentRange = fmt.Sprintf("bytes %d-%d/%d", startOffset, startOffset+length-1, size)
} else if *input.PartNumber > 1 {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange)
} else {
// partNumber=1 on a non-multipart object: return full object size,
// no Content-Range, no PartsCount.
length = size
}
} else {
startOffset, lgth, isValid, err := backend.ParseObjectRange(size, getString(input.Range))
if err != nil {
return nil, err
}
length = lgth
if isValid {
contentRange = fmt.Sprintf("bytes %v-%v/%v",
startOffset, startOffset+length-1, size)
}
}
result := &s3.HeadObjectOutput{
ContentRange: &contentRange,
AcceptRanges: backend.GetPtrFromString("bytes"),
ContentLength: &length,
PartsCount: partsCount,
ContentType: resp.ContentType,
ContentEncoding: resp.ContentEncoding,
ContentLanguage: resp.ContentLanguage,
@@ -1680,7 +1759,29 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
props, err := blobClient.GetProperties(ctx, nil)
if err != nil {
return res, "", parseMpError(err)
mpErr := parseMpError(err)
// If the tmp blob is already gone, the upload may have already been
// completed. Check the final object's mp-metadata and return success
// if the upload IDs match.
if errors.Is(mpErr, s3err.GetAPIError(s3err.ErrNoSuchUpload)) {
finalClient, clientErr := az.getBlobClient(*input.Bucket, *input.Key)
if clientErr == nil {
finalProps, propErr := finalClient.GetProperties(ctx, nil)
if propErr == nil {
if mpMetaStr, ok := finalProps.Metadata[string(keyMpMetadata)]; ok && mpMetaStr != nil {
var mpMeta backend.MpUploadMetadata
if jsonErr := json.Unmarshal([]byte(*mpMetaStr), &mpMeta); jsonErr == nil && mpMeta.UploadID == *input.UploadId {
return s3response.CompleteMultipartUploadResult{
Bucket: input.Bucket,
Key: input.Key,
ETag: backend.GetPtrFromString(convertAzureEtag(finalProps.ETag)),
}, "", nil
}
}
}
}
}
return res, "", mpErr
}
tags, err := blobClient.GetTags(ctx, nil)
if err != nil {
@@ -1728,6 +1829,8 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
// The initial value is the lower limit of partNumber: 0
var totalSize int64
var partNumber int32
// partSizes[i] = cumulative byte offset after part i+1 (see backend.MpUploadMetadata)
var partSizes []int64
last := len(input.MultipartUpload.Parts) - 1
for i, part := range input.MultipartUpload.Parts {
if part.PartNumber == nil {
@@ -1754,6 +1857,7 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
return res, "", s3err.GetAPIError(s3err.ErrEntityTooSmall)
}
// Zero-byte parts contribute no data; skip adding to blockIds.
partSizes = append(partSizes, totalSize)
continue
}
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
@@ -1768,6 +1872,7 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
return res, "", s3err.GetAPIError(s3err.ErrEntityTooSmall)
}
totalSize += *block.Size
partSizes = append(partSizes, totalSize)
blockIds = append(blockIds, *block.Name)
}
@@ -1779,6 +1884,18 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
// Remove internal tracking keys from metadata before storing on the final blob.
delete(props.Metadata, string(keyMpZeroBytesParts))
// Serialize multipart metadata so GetObject/HeadObject can serve by part-number.
mpMeta := backend.MpUploadMetadata{UploadID: *input.UploadId, Parts: partSizes}
mpMetaJSON, err := json.Marshal(mpMeta)
if err != nil {
return res, "", fmt.Errorf("marshal mp metadata: %w", err)
}
mpMetaStr := string(mpMetaJSON)
if props.Metadata == nil {
props.Metadata = map[string]*string{}
}
props.Metadata[string(keyMpMetadata)] = &mpMetaStr
opts := &blockblob.CommitBlockListOptions{
Metadata: props.Metadata,
Tags: parseAzTags(tags.BlobTagSet),

View File

@@ -384,21 +384,21 @@ func isValidTagComponent(str string) bool {
return validTagComponent.Match([]byte(str))
}
func GetMultipartMD5(parts []types.CompletedPart) string {
func GetMultipartMD5(parts []types.CompletedPart) (string, error) {
var partsEtagBytes []byte
for _, part := range parts {
partsEtagBytes = append(partsEtagBytes, getEtagBytes(*part.ETag)...)
bts, err := getEtagBytes(*part.ETag)
if err != nil {
return "", fmt.Errorf("decode etag: %w", err)
}
partsEtagBytes = append(partsEtagBytes, bts...)
}
return fmt.Sprintf("\"%s-%d\"", md5String(partsEtagBytes), len(parts))
return fmt.Sprintf("\"%s-%d\"", md5String(partsEtagBytes), len(parts)), nil
}
func getEtagBytes(etag string) []byte {
decode, err := hex.DecodeString(strings.ReplaceAll(etag, string('"'), ""))
if err != nil {
return []byte(etag)
}
return decode
func getEtagBytes(etag string) ([]byte, error) {
return hex.DecodeString(strings.ReplaceAll(etag, string('"'), ""))
}
func md5String(data []byte) string {
@@ -406,6 +406,16 @@ func md5String(data []byte) string {
return hex.EncodeToString(sum[:])
}
// MpUploadMetadata is stored alongside the final object after a multipart
// upload completes. It records the uploadId and the cumulative byte offsets
// of each part: parts[i] = sum of sizes of parts 1..i+1. This allows O(1)
// total size lookup (parts[last]) and O(1) part-start/size derivation for
// GetObject/HeadObject part-number requests.
type MpUploadMetadata struct {
UploadID string `json:"uploadId"`
Parts []int64 `json:"parts"`
}
type FileSectionReadCloser struct {
R io.Reader
F *os.File

View File

@@ -136,6 +136,7 @@ const (
deleteMarkerKey = "delete-marker"
versionIdKey = "version-id"
partCrc64nvme = "part-crc64nvme"
mpMetaKey = "mp-metadata"
nullVersionId = "null"
@@ -1690,25 +1691,69 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
return res, "", fmt.Errorf("stat bucket: %w", err)
}
sum, err := p.checkUploadIDExists(bucket, object, uploadID)
if err != nil {
return res, "", err
}
// Atomically rename the upload directory to <uploadID>.inprogress so that
// concurrent CompleteMultipartUpload calls for the same upload ID see it as
// absent and return ErrNoSuchUpload rather than racing through the rest of
// the function. If this call does not succeed we rename it back (see the
// deferred cleanup below).
// Rename the upload directory to <uploadId><ETag> to atomically claim
// the processing slot. A concurrent call with the same uploadId will compute
// the same ETag, so it will either find the directory still present (still
// processing) or gone (already completed) and react accordingly.
sum := sha256.Sum256([]byte(object))
objdirFull := filepath.Join(bucket, MetaTmpMultipartDir, fmt.Sprintf("%x", sum))
uploadIDDir := filepath.Join(objdirFull, uploadID)
activeUploadName := uploadID + inProgressSuffix
// Calculate s3 compatible md5sum for complete multipart.
s3MD5, err := backend.GetMultipartMD5(parts)
if err != nil {
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
activeUploadName := fmt.Sprintf("%s.%s%s", uploadID, strings.Trim(s3MD5, "\""), inProgressSuffix)
uploadIDInProgress := filepath.Join(objdirFull, activeUploadName)
if err := os.Rename(uploadIDDir, uploadIDInProgress); err != nil {
if errors.Is(err, fs.ErrNotExist) {
return res, "", s3err.GetAPIError(s3err.ErrNoSuchUpload)
err = os.Rename(uploadIDDir, uploadIDInProgress)
if errors.Is(err, fs.ErrNotExist) {
// Another call already claimed this slot and is still assembling the object.
if _, statErr := os.Stat(uploadIDInProgress); statErr == nil {
// Still in progress — treat as success for idempotency.
return s3response.CompleteMultipartUploadResult{
Bucket: &bucket,
ETag: &s3MD5,
Key: &object,
}, "", nil
}
return res, "", fmt.Errorf("mark upload in-progress: %w", err)
// Directory is gone: the concurrent call already completed and cleaned up.
if _, statErr := os.Stat(filepath.Join(bucket, object)); statErr == nil {
return s3response.CompleteMultipartUploadResult{
Bucket: &bucket,
ETag: &s3MD5,
Key: &object,
}, "", nil
}
// Last resort: the object stat above may have lost a race with the
// concurrent call's link step. Check the mp-metadata xattr, as this
// multipart upload may have been finalized and the final object has been created
// before or by the racing request
if mpMetaBytes, statErr := p.meta.RetrieveAttribute(nil, bucket, object, mpMetaKey); statErr == nil {
var mpMeta backend.MpUploadMetadata
if err := json.Unmarshal(mpMetaBytes, &mpMeta); err != nil {
return res, "", fmt.Errorf("parse object multipart metadata: %w", err)
}
// The object may have been overwritten by a newer upload or
// it's the result of a completely different multipart upload; only
// treat it as our completion if the upload IDs match.
if mpMeta.UploadID != uploadID {
return res, "", s3err.GetAPIError(s3err.ErrNoSuchUpload)
}
return s3response.CompleteMultipartUploadResult{
Bucket: &bucket,
ETag: &s3MD5,
Key: &object,
}, "", nil
}
return res, "", s3err.GetAPIError(s3err.ErrNoSuchUpload)
}
if err != nil {
return res, "", fmt.Errorf("rename upload to etag dir: %w", err)
}
// Rename sidecar metadata to match the new data directory path.
@@ -1761,11 +1806,23 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
checksums.Algorithm = types.ChecksumAlgorithmCrc64nvme
}
// Initialize composite checksum reader
var compositeChecksumRdr *utils.CompositeChecksumReader
if checksums.Type == types.ChecksumTypeComposite {
compositeChecksumRdr, err = utils.NewCompositeChecksumReader(utils.HashType(strings.ToLower(string(checksums.Algorithm))))
if err != nil {
return res, "", fmt.Errorf("initialize composite checksum reader: %w", err)
}
}
// check all parts ok
last := len(parts) - 1
var totalsize int64
// cumulative byte offsets: partSizes[i] = sum of sizes of parts 1..i+1
var partSizes []int64
var composableCsum string
// The initialie values is the lower limit of partNumber: 0
// The initial value is the lower limit of partNumber: 0
var partNumber int32
for i, part := range parts {
if part.PartNumber == nil {
@@ -1788,6 +1845,7 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
}
totalsize += fi.Size()
partSizes = append(partSizes, totalsize)
// all parts except the last need to be greater, than or equal to
// the minimum allowed size (5 Mib)
if i < last && fi.Size() < backend.MinPartSize {
@@ -1813,18 +1871,86 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
if err != nil {
return res, "", err
}
// Accumulate checksum state.
switch checksums.Type {
case types.ChecksumTypeFullObject:
var pcs string
if mpChecksumType != "" {
pcs = getPartChecksum(checksums.Algorithm, part)
} else {
crc64nvme, err := p.meta.RetrieveAttribute(nil, bucket, partObjPath, partCrc64nvme)
if err != nil {
return res, "", fmt.Errorf("retrieve part internal crc64nvme: %w", err)
}
pcs = string(crc64nvme)
}
if i == 0 {
composableCsum = pcs
} else {
composableCsum, err = utils.AddCRCChecksum(checksums.Algorithm, composableCsum, pcs, fi.Size())
if err != nil {
return res, "", fmt.Errorf("add part %v checksum: %w", *part.PartNumber, err)
}
}
case types.ChecksumTypeComposite:
if err := compositeChecksumRdr.Process(getPartChecksum(checksums.Algorithm, part)); err != nil {
return res, "", fmt.Errorf("process %v part checksum: %w", *part.PartNumber, err)
}
}
}
if input.MpuObjectSize != nil && totalsize != *input.MpuObjectSize {
return res, "", s3err.GetIncorrectMpObjectSizeErr(totalsize, *input.MpuObjectSize)
}
var compositeChecksumRdr *utils.CompositeChecksumReader
if checksums.Type == types.ChecksumTypeComposite {
// initialize the composite checksum reader if the mp checksum type is COMPOSITE
compositeChecksumRdr, err = utils.NewCompositeChecksumReader(utils.HashType(strings.ToLower(string(checksums.Algorithm))))
if err != nil {
return res, "", fmt.Errorf("initialize composite checksum reader: %w", err)
// Compute the final checksum value.
var value string
switch checksums.Type {
case types.ChecksumTypeComposite:
value = fmt.Sprintf("%s-%v", compositeChecksumRdr.Sum(), len(parts))
case types.ChecksumTypeFullObject:
value = composableCsum
}
var crc32 *string
var crc32c *string
var sha1 *string
var sha256 *string
var crc64nvme *string
var gotSum *string
switch checksums.Algorithm {
case types.ChecksumAlgorithmCrc32:
gotSum = input.ChecksumCRC32
checksums.CRC32 = &value
crc32 = &value
case types.ChecksumAlgorithmCrc32c:
gotSum = input.ChecksumCRC32C
checksums.CRC32C = &value
crc32c = &value
case types.ChecksumAlgorithmSha1:
gotSum = input.ChecksumSHA1
checksums.SHA1 = &value
sha1 = &value
case types.ChecksumAlgorithmSha256:
gotSum = input.ChecksumSHA256
checksums.SHA256 = &value
sha256 = &value
case types.ChecksumAlgorithmCrc64nvme:
gotSum = input.ChecksumCRC64NVME
checksums.CRC64NVME = &value
crc64nvme = &value
}
// Check if the provided checksum and the calculated one are the same.
if mpChecksumType != "" && gotSum != nil {
s := *gotSum
if checksums.Type == types.ChecksumTypeComposite && !strings.Contains(s, "-") {
s = fmt.Sprintf("%s-%v", s, len(parts))
}
if s != value {
return res, "", s3err.GetChecksumBadDigestErr(checksums.Algorithm)
}
}
@@ -1838,56 +1964,14 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
}
defer f.cleanup()
var composableCsum string
var abortOnErrSet bool
for i, part := range parts {
for _, part := range parts {
partObjPath := filepath.Join(objdir, activeUploadName, fmt.Sprintf("%v", *part.PartNumber))
fullPartPath := filepath.Join(bucket, partObjPath)
pf, err := os.Open(fullPartPath)
if err != nil {
return res, "", fmt.Errorf("open part %v: %v", *part.PartNumber, err)
}
pfi, err := pf.Stat()
if err != nil {
pf.Close()
return res, "", fmt.Errorf("stat part %v: %v", *part.PartNumber, err)
}
switch checksums.Type {
case types.ChecksumTypeFullObject:
var partChecksum string
if mpChecksumType != "" {
// if any checksum has been initially specified on mp creation
// read the part checksum configuration
partChecksum = getPartChecksum(checksums.Algorithm, part)
} else {
// if no checksum has been specified on mp creation
// retrieve the internally stored crc64nvme
crc64nvme, err := p.meta.RetrieveAttribute(pf, bucket, partObjPath, partCrc64nvme)
if err != nil {
pf.Close()
return res, "", fmt.Errorf("retrieve part internal crc64nvme: %w", err)
}
partChecksum = string(crc64nvme)
}
if i == 0 {
composableCsum = partChecksum
break
}
composableCsum, err = utils.AddCRCChecksum(checksums.Algorithm, composableCsum, partChecksum, pfi.Size())
if err != nil {
pf.Close()
return res, "", fmt.Errorf("add part %v checksum: %w",
*part.PartNumber, err)
}
case types.ChecksumTypeComposite:
err := compositeChecksumRdr.Process(getPartChecksum(checksums.Algorithm, part))
if err != nil {
pf.Close()
return res, "", fmt.Errorf("process %v part checksum: %w",
*part.PartNumber, err)
}
}
if customCopy != nil {
idemp, err := customCopy(pf, f.File())
@@ -2004,59 +2088,6 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
}
}
var crc32 *string
var crc32c *string
var sha1 *string
var sha256 *string
var crc64nvme *string
var value string
switch checksums.Type {
case types.ChecksumTypeComposite:
value = fmt.Sprintf("%s-%v", compositeChecksumRdr.Sum(), len(parts))
case types.ChecksumTypeFullObject:
value = composableCsum
}
var gotSum *string
switch checksums.Algorithm {
case types.ChecksumAlgorithmCrc32:
gotSum = input.ChecksumCRC32
checksums.CRC32 = &value
crc32 = &value
case types.ChecksumAlgorithmCrc32c:
gotSum = input.ChecksumCRC32C
checksums.CRC32C = &value
crc32c = &value
case types.ChecksumAlgorithmSha1:
gotSum = input.ChecksumSHA1
checksums.SHA1 = &value
sha1 = &value
case types.ChecksumAlgorithmSha256:
gotSum = input.ChecksumSHA256
checksums.SHA256 = &value
sha256 = &value
case types.ChecksumAlgorithmCrc64nvme:
gotSum = input.ChecksumCRC64NVME
checksums.CRC64NVME = &value
crc64nvme = &value
}
// Check if the provided checksum and the calculated one are the same
if mpChecksumType != "" && gotSum != nil {
s := *gotSum
if checksums.Type == types.ChecksumTypeComposite && !strings.Contains(s, "-") {
// if number of parts is not specified in the final checksum
// make sure to add, to not fail in the final comparison
s = fmt.Sprintf("%s-%v", s, len(parts))
}
if s != value {
return res, "", s3err.GetChecksumBadDigestErr(checksums.Algorithm)
}
}
err = p.storeChecksums(f.File(), bucket, object, checksums)
if err != nil {
return res, "", fmt.Errorf("store object checksum: %w", err)
@@ -2074,14 +2105,24 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
}
}
// Calculate s3 compatible md5sum for complete multipart.
s3MD5 := backend.GetMultipartMD5(parts)
err = p.meta.StoreAttribute(f.File(), bucket, object, etagkey, []byte(s3MD5))
if err != nil {
return res, "", fmt.Errorf("set etag attr: %w", err)
}
// Store multipart upload metadata on the final object so that GetObject /
// HeadObject can serve individual parts by part-number.
mpMeta := backend.MpUploadMetadata{UploadID: uploadID, Parts: partSizes}
mpMetaJSON, err := json.Marshal(mpMeta)
if err != nil {
return res, "", fmt.Errorf("marshal object multipart metadata: %w", err)
}
err = p.meta.StoreAttribute(f.File(), bucket, object, mpMetaKey, mpMetaJSON)
if err != nil {
return res, "", fmt.Errorf("set object multipart metadata: %w", err)
}
err = f.link()
if err != nil {
return res, "", fmt.Errorf("link object in namespace: %w", err)
@@ -4182,11 +4223,6 @@ func (p *Posix) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.Ge
return nil, err
}
if input.PartNumber != nil {
// querying an object by part number is not supported
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
}
if !p.versioningEnabled() && versionId != "" {
//TODO: Maybe we need to return our custom error here?
return nil, s3err.GetAPIError(s3err.ErrInvalidVersionId)
@@ -4370,15 +4406,59 @@ func (p *Posix) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.Ge
}
objSize := fi.Size()
startOffset, length, isValid, err := backend.ParseObjectRange(objSize, *input.Range)
if err != nil {
return nil, err
if fi.IsDir() {
objSize = 0
}
var contentRange string
if isValid {
contentRange = fmt.Sprintf("bytes %v-%v/%v",
startOffset, startOffset+length-1, objSize)
var contentRange *string
var startOffset, length int64
var partsCount *int32
// If partNumber is requested and mp-metadata exists, serve that specific part.
// For non-multipart objects (no mp-metadata), partNumber=1 returns the full
// object with no Content-Range; any other partNumber is out of range.
// Both range read and partNumber can't be used together.
if input.PartNumber != nil {
mpMetaBytes, metaErr := p.meta.RetrieveAttribute(nil, bucket, object, mpMetaKey)
if metaErr == nil {
var mpMeta backend.MpUploadMetadata
if err := json.Unmarshal(mpMetaBytes, &mpMeta); err != nil {
return nil, fmt.Errorf("parse object multipart metadata: %w", err)
}
partNum := *input.PartNumber
totalParts := int32(len(mpMeta.Parts))
partsCount = &totalParts
if partNum > totalParts {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange)
}
// Parts holds cumulative sizes: Parts[i] = sum of sizes 1..i+1
if partNum > 1 {
startOffset = mpMeta.Parts[partNum-2]
}
length = mpMeta.Parts[partNum-1] - startOffset
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %d-%d/%d", startOffset, startOffset+length-1, objSize))
} else if errors.Is(metaErr, meta.ErrNoSuchKey) {
// Non-multipart object: partNumber=1 means the whole object; anything
// higher is out of range
if *input.PartNumber > 1 {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange)
}
length = objSize
} else {
return nil, fmt.Errorf("retrieve mp metadata: %w", metaErr)
}
} else {
start, lgth, isValid, err := backend.ParseObjectRange(objSize, getString(input.Range))
if err != nil {
return nil, err
}
startOffset, length = start, lgth
if isValid {
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %v-%v/%v", start, start+lgth-1, objSize))
}
}
objMeta := p.loadObjectMetaProperties(f, bucket, object, &fi)
@@ -4422,7 +4502,7 @@ func (p *Posix) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.Ge
LastModified: backend.GetTimePtr(fi.ModTime()),
Metadata: objMeta.Metadata,
TagCount: tagCount,
ContentRange: &contentRange,
ContentRange: contentRange,
StorageClass: types.StorageClassStandard,
VersionId: &versionId,
Body: body,
@@ -4432,6 +4512,7 @@ func (p *Posix) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.Ge
ChecksumSHA256: checksums.SHA256,
ChecksumCRC64NVME: checksums.CRC64NVME,
ChecksumType: checksums.Type,
PartsCount: partsCount,
}, nil
}
@@ -4451,11 +4532,6 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.
return nil, err
}
if input.PartNumber != nil {
// querying an object by part number is not supported
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
}
if !p.versioningEnabled() && versionId != "" {
//TODO: Maybe we need to return our custom error here?
return nil, s3err.GetAPIError(s3err.ErrInvalidVersionId)
@@ -4569,15 +4645,55 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.
size = 0
}
startOffset, length, isValid, err := backend.ParseObjectRange(size, getString(input.Range))
if err != nil {
return nil, err
}
var contentRange *string
var startOffset, length int64
var partsCount *int32
var contentRange string
if isValid {
contentRange = fmt.Sprintf("bytes %v-%v/%v",
startOffset, startOffset+length-1, size)
// If partNumber is requested and mp-metadata exists, serve that specific part.
// For non-multipart objects (no mp-metadata), partNumber=1 returns the full
// object with no Content-Range; any other partNumber is out of range.
// Both range read and partNumber can't be used together.
if input.PartNumber != nil {
mpMetaBytes, metaErr := p.meta.RetrieveAttribute(nil, bucket, object, mpMetaKey)
if metaErr == nil {
var mpMeta backend.MpUploadMetadata
if err := json.Unmarshal(mpMetaBytes, &mpMeta); err != nil {
return nil, fmt.Errorf("parse object multipart metadata: %w", err)
}
partNum := *input.PartNumber
totalParts := int32(len(mpMeta.Parts))
partsCount = &totalParts
if partNum > totalParts {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange)
}
// Parts holds cumulative sizes: Parts[i] = sum of sizes 1..i+1
if partNum > 1 {
startOffset = mpMeta.Parts[partNum-2]
}
length = mpMeta.Parts[partNum-1] - startOffset
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %d-%d/%d", startOffset, startOffset+length-1, size))
} else if errors.Is(metaErr, meta.ErrNoSuchKey) {
// Non-multipart object: partNumber=1 means the whole object; anything
// higher is out of range
if *input.PartNumber > 1 {
return nil, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange)
}
length = size
} else {
return nil, fmt.Errorf("retrieve mp metadata: %w", metaErr)
}
} else {
start, lgth, isValid, err := backend.ParseObjectRange(size, getString(input.Range))
if err != nil {
return nil, err
}
startOffset, length = start, lgth
if isValid {
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %v-%v/%v", start, start+lgth-1, size))
}
}
var objectLockLegalHoldStatus types.ObjectLockLegalHoldStatus
@@ -4622,7 +4738,7 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.
return &s3.HeadObjectOutput{
ContentLength: &length,
AcceptRanges: backend.GetPtrFromString("bytes"),
ContentRange: &contentRange,
ContentRange: contentRange,
ContentType: objMeta.ContentType,
ContentEncoding: objMeta.ContentEncoding,
ContentDisposition: objMeta.ContentDisposition,
@@ -4644,6 +4760,7 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.
ChecksumCRC64NVME: checksums.CRC64NVME,
ChecksumType: checksums.Type,
TagCount: tagCount,
PartsCount: partsCount,
}, nil
}

View File

@@ -460,6 +460,15 @@ func (c S3ApiController) GetObject(ctx *fiber.Ctx) (*Response, error) {
}, s3err.GetAPIError(s3err.ErrInvalidPartNumber)
}
if acceptRange != "" {
debuglogger.Logf("Range and partNumber cannot both be specified")
return &Response{
MetaOpts: &MetaOptions{
BucketOwner: parsedAcl.Owner,
},
}, s3err.GetAPIError(s3err.ErrRangeAndPartNumber)
}
partNumber = &partNumberQuery
}

View File

@@ -722,6 +722,26 @@ func TestS3ApiController_GetObject(t *testing.T) {
err: s3err.GetAPIError(s3err.ErrInvalidPartNumber),
},
},
{
name: "both partNumber and Range",
input: testInput{
locals: defaultLocals,
queries: map[string]string{
"partNumber": "2",
},
headers: map[string]string{
"Range": "bytes=10-20",
},
},
output: testOutput{
response: &Response{
MetaOpts: &MetaOptions{
BucketOwner: "root",
},
},
err: s3err.GetAPIError(s3err.ErrRangeAndPartNumber),
},
},
{
name: "backend returns error",
input: testInput{

View File

@@ -107,6 +107,15 @@ func (c S3ApiController) HeadObject(ctx *fiber.Ctx) (*Response, error) {
}, s3err.GetAPIError(s3err.ErrInvalidPartNumber)
}
if objRange != "" {
debuglogger.Logf("Range and partNumber cannot both be specified")
return &Response{
MetaOpts: &MetaOptions{
BucketOwner: parsedAcl.Owner,
},
}, s3err.GetAPIError(s3err.ErrRangeAndPartNumber)
}
partNumber = &partNumberQuery
}

View File

@@ -98,6 +98,26 @@ func TestS3ApiController_HeadObject(t *testing.T) {
err: s3err.GetAPIError(s3err.ErrInvalidPartNumber),
},
},
{
name: "both partNumber and Range",
input: testInput{
locals: defaultLocals,
queries: map[string]string{
"partNumber": "6",
},
headers: map[string]string{
"Range": "bytes=1-3",
},
},
output: testOutput{
response: &Response{
MetaOpts: &MetaOptions{
BucketOwner: "root",
},
},
err: s3err.GetAPIError(s3err.ErrRangeAndPartNumber),
},
},
{
name: "invalid checksum mode",
input: testInput{

View File

@@ -81,6 +81,8 @@ const (
ErrInvalidObjectAttributes
ErrInvalidPart
ErrInvalidPartNumber
ErrInvalidPartNumberRange
ErrRangeAndPartNumber
ErrInvalidPartOrder
ErrInvalidCompleteMpPartNumber
ErrInternalError
@@ -339,6 +341,16 @@ var errorCodeResponse = map[ErrorCode]APIError{
Description: "Part number must be an integer between 1 and 10000, inclusive.",
HTTPStatusCode: http.StatusBadRequest,
},
ErrInvalidPartNumberRange: {
Code: "InvalidPartNumber",
Description: "The requested partnumber is not satisfiable.",
HTTPStatusCode: http.StatusRequestedRangeNotSatisfiable,
},
ErrRangeAndPartNumber: {
Code: "InvalidRequest",
Description: "Cannot specify both Range header and partNumber query parameter",
HTTPStatusCode: http.StatusBadRequest,
},
ErrInvalidPartOrder: {
Code: "InvalidPartOrder",
Description: "The list of parts was not in ascending order. Parts must be ordered by part number.",

View File

@@ -1907,13 +1907,77 @@ func CompleteMultipartUpload_racey_success(s *S3Conf) error {
})
}
func CompleteMultipartUpload_already_completed(s *S3Conf) error {
testName := "CompleteMultipartUpload_already_completed"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "my-obj"
out, err := createMp(s3client, bucket, obj)
if err != nil {
return err
}
parts, _, err := uploadParts(s3client, 5*1024*1024, 1, bucket, obj, *out.UploadId)
if err != nil {
return err
}
compParts := []types.CompletedPart{}
for _, el := range parts {
compParts = append(compParts, types.CompletedPart{
ETag: el.ETag,
PartNumber: el.PartNumber,
})
}
completeInput := &s3.CompleteMultipartUploadInput{
Bucket: &bucket,
Key: &obj,
UploadId: out.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: compParts,
},
}
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
first, err := s3client.CompleteMultipartUpload(ctx, completeInput)
cancel()
if err != nil {
return err
}
// Second call with the same upload ID should also succeed (idempotent).
ctx, cancel = context.WithTimeout(context.Background(), shortTimeout)
second, err := s3client.CompleteMultipartUpload(ctx, completeInput)
cancel()
if err != nil {
return err
}
if getString(second.Bucket) != bucket {
return fmt.Errorf("expected Bucket to be %s, instead got %s", bucket, getString(second.Bucket))
}
if getString(second.Key) != obj {
return fmt.Errorf("expected Key to be %s, instead got %s", obj, getString(second.Key))
}
if getString(second.ETag) != getString(first.ETag) {
return fmt.Errorf("expected ETag to be %s, instead got %s", getString(first.ETag), getString(second.ETag))
}
location := constructObjectLocation(s.endpoint, bucket, obj, s.hostStyle)
if getString(second.Location) != location {
return fmt.Errorf("expected Location to be %s, instead got %s", location, getString(second.Location))
}
return nil
})
}
// CompleteMultipartUpload_racey_data_integrity creates a single multipart
// upload, uploads its parts, then fires multiple concurrent
// CompleteMultipartUpload calls for the exact same upload ID. Exactly one
// call must succeed; the rest must fail with NoSuchUpload (the upload was
// already consumed). The surviving object must contain exactly the data that
// was uploaded. The whole sequence is repeated several times so that
// intermittent scheduling cannot hide a data-corruption bug.
// CompleteMultipartUpload calls for the exact same upload ID. All calls must
// succeed and return the same ETag (idempotent completion). The surviving
// object must contain exactly the data that was uploaded. The whole sequence
// is repeated several times so that intermittent scheduling cannot hide a
// data-corruption bug.
func CompleteMultipartUpload_racey_data_integrity(s *S3Conf) error {
testName := "CompleteMultipartUpload_racey_data_integrity"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
@@ -1924,7 +1988,6 @@ func CompleteMultipartUpload_racey_data_integrity(s *S3Conf) error {
)
obj := "my-obj"
objSize := int64(15 * 1024 * 1024) // 3 × 5 MiB parts
noSuchUpload := s3err.GetAPIError(s3err.ErrNoSuchUpload)
for iter := range iterations {
// Phase 1: create one upload and upload its parts.
@@ -1938,25 +2001,26 @@ func CompleteMultipartUpload_racey_data_integrity(s *S3Conf) error {
return fmt.Errorf("iteration %d: upload parts: %w", iter, err)
}
var partsEtagBytes []byte
compParts := make([]types.CompletedPart, 0, len(parts))
for _, el := range parts {
b, err := getEtagBytes(*el.ETag)
if err != nil {
return fmt.Errorf("iteration %d: %w", iter, err)
}
partsEtagBytes = append(partsEtagBytes, b...)
compParts = append(compParts, types.CompletedPart{
ETag: el.ETag,
PartNumber: el.PartNumber,
})
}
expectedETag := fmt.Sprintf("\"%s-%d\"", md5String(partsEtagBytes), len(parts))
// Phase 2: race concurrency complete calls for the same upload ID.
// Exactly one must succeed; the others must return NoSuchUpload.
var (
mu sync.Mutex
successCnt int
)
eg := errgroup.Group{}
for range concurrency {
eg.Go(func() error {
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err := s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
res, err := s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: &bucket,
Key: &obj,
UploadId: out.UploadId,
@@ -1965,23 +2029,20 @@ func CompleteMultipartUpload_racey_data_integrity(s *S3Conf) error {
},
})
cancel()
if err == nil {
mu.Lock()
successCnt++
mu.Unlock()
return nil
if err != nil {
return err
}
// Every non-zero error must be NoSuchUpload.
return checkApiErr(err, noSuchUpload)
if getString(res.ETag) != expectedETag {
return fmt.Errorf("expected the multipart upload ETag to be %s, instead got %s", expectedETag, getString(res.ETag))
}
return nil
})
}
if err := eg.Wait(); err != nil {
return fmt.Errorf("iteration %d: complete phase: %w", iter, err)
}
if successCnt != 1 {
return fmt.Errorf("iteration %d: expected exactly 1 successful complete, got %d",
iter, successCnt)
}
// Phase 3: download the object and verify it is complete and
// uncorrupted — its checksum must match what was uploaded.

View File

@@ -15,6 +15,7 @@
package integration
import (
"bytes"
"context"
"crypto/sha256"
"errors"
@@ -1256,17 +1257,246 @@ func GetObject_invalid_part_number(s *S3Conf) error {
})
}
func GetObject_part_number_not_supported(s *S3Conf) error {
testName := "GetObject_part_number_not_supported"
func GetObject_range_and_part_number(s *S3Conf) error {
testName := "GetObject_range_and_part_number"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
defer cancel()
_, err := s3client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: getPtr("obj"),
PartNumber: getPtr(int32(3)),
})
obj := "my-obj"
_, err := putObjectWithData(100, &s3.PutObjectInput{
Bucket: &bucket,
Key: &obj,
}, s3client)
if err != nil {
return err
}
return checkApiErr(err, s3err.GetAPIError(s3err.ErrNotImplemented))
pn := int32(1)
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err = s3client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &obj,
Range: getPtr("bytes=0-9"),
PartNumber: &pn,
})
cancel()
return checkApiErr(err, s3err.GetAPIError(s3err.ErrRangeAndPartNumber))
})
}
func GetObject_mp_part_number_exceeds_parts_count(s *S3Conf) error {
testName := "GetObject_mp_part_number_exceeds_parts_count"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "my-obj"
out, err := createMp(s3client, bucket, obj)
if err != nil {
return err
}
const partCount = int64(5)
parts, _, err := uploadParts(s3client, partCount*5*1024*1024, partCount, bucket, obj, *out.UploadId)
if err != nil {
return err
}
compParts := make([]types.CompletedPart, len(parts))
for i, p := range parts {
compParts[i] = types.CompletedPart{
ETag: p.ETag,
PartNumber: p.PartNumber,
}
}
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err = s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: &bucket,
Key: &obj,
UploadId: out.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: compParts,
},
})
cancel()
if err != nil {
return err
}
// partNumber exceeds the number of parts in the completed upload
pn := int32(partCount + 1)
ctx, cancel = context.WithTimeout(context.Background(), shortTimeout)
_, err = s3client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &obj,
PartNumber: &pn,
})
cancel()
return checkApiErr(err, s3err.GetAPIError(s3err.ErrInvalidPartNumberRange))
})
}
func GetObject_mp_part_number_success(s *S3Conf) error {
testName := "GetObject_mp_part_number_success"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "my-obj"
out, err := createMp(s3client, bucket, obj)
if err != nil {
return err
}
const partCount = 3
const partSize = int64(5 * 1024 * 1024)
const totalSize = int64(partCount) * partSize
// Upload parts manually to capture per-part data for integrity checks
partNumbers := make([]int32, partCount)
partChecksums := make([][32]byte, partCount)
compParts := make([]types.CompletedPart, partCount)
for i := range partCount {
partNumbers[i] = int32(i + 1)
buf := make([]byte, partSize)
for j := range buf {
buf[j] = byte(i*17 + j%251)
}
partChecksums[i] = sha256.Sum256(buf)
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
res, err := s3client.UploadPart(ctx, &s3.UploadPartInput{
Bucket: &bucket,
Key: &obj,
UploadId: out.UploadId,
Body: bytes.NewReader(buf),
PartNumber: &partNumbers[i],
})
cancel()
if err != nil {
return fmt.Errorf("upload part %d: %w", partNumbers[i], err)
}
compParts[i] = types.CompletedPart{
ETag: res.ETag,
PartNumber: &partNumbers[i],
}
}
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err = s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: &bucket,
Key: &obj,
UploadId: out.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: compParts,
},
})
cancel()
if err != nil {
return err
}
for i := range partCount {
pn := int32(i + 1)
startByte := int64(i) * partSize
endByte := startByte + partSize - 1
expectedContentRange := fmt.Sprintf("bytes %d-%d/%d", startByte, endByte, totalSize)
ctx, cancel := context.WithTimeout(context.Background(), longTimeout)
res, err := s3client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &obj,
PartNumber: &pn,
})
if err != nil {
cancel()
return fmt.Errorf("part %d: %w", pn, err)
}
if res.PartsCount == nil {
cancel()
return fmt.Errorf("part %d: expected non-nil x-amz-mp-parts-count", pn)
}
if *res.PartsCount != int32(partCount) {
cancel()
return fmt.Errorf("part %d: expected PartsCount %d, got %d", pn, partCount, *res.PartsCount)
}
if getString(res.ContentRange) != expectedContentRange {
cancel()
return fmt.Errorf("part %d: expected Content-Range %q, got %q", pn, expectedContentRange, getString(res.ContentRange))
}
if res.ContentLength == nil || *res.ContentLength != partSize {
cancel()
return fmt.Errorf("part %d: expected Content-Length %d, got %v", pn, partSize, res.ContentLength)
}
if getString(res.AcceptRanges) != "bytes" {
cancel()
return fmt.Errorf("part %d: expected Accept-Ranges 'bytes', got %q", pn, getString(res.AcceptRanges))
}
body, readErr := io.ReadAll(res.Body)
cancel()
res.Body.Close()
if readErr != nil {
return fmt.Errorf("part %d: read body: %w", pn, readErr)
}
gotSum := sha256.Sum256(body)
if gotSum != partChecksums[i] {
return fmt.Errorf("part %d: data integrity check failed: body checksum mismatch", pn)
}
}
return nil
})
}
func GetObject_non_mp_part_number_1_success(s *S3Conf) error {
testName := "GetObject_non_mp_part_number_1_success"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "put-object-part1"
const objSize = int64(1234)
out, err := putObjectWithData(objSize, &s3.PutObjectInput{
Bucket: &bucket,
Key: &obj,
}, s3client)
if err != nil {
return err
}
pn := int32(1)
ctx, cancel := context.WithTimeout(context.Background(), longTimeout)
res, err := s3client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &obj,
PartNumber: &pn,
})
if err != nil {
cancel()
return err
}
if res.ContentLength == nil || *res.ContentLength != objSize {
cancel()
return fmt.Errorf("expected ContentLength %d, got %v", objSize, res.ContentLength)
}
if getString(res.ContentRange) != "" {
cancel()
return fmt.Errorf("expected empty Content-Range for non-multipart object, got %q", getString(res.ContentRange))
}
if res.PartsCount != nil {
cancel()
return fmt.Errorf("expected nil PartsCount for non-multipart object, got %d", *res.PartsCount)
}
if getString(res.AcceptRanges) != "bytes" {
cancel()
return fmt.Errorf("expected Accept-Ranges 'bytes', got %q", getString(res.AcceptRanges))
}
body, readErr := io.ReadAll(res.Body)
cancel()
res.Body.Close()
if readErr != nil {
return fmt.Errorf("read body: %w", readErr)
}
if gotSum := sha256.Sum256(body); gotSum != out.csum {
return fmt.Errorf("data integrity check failed: body checksum mismatch")
}
return nil
})
}

View File

@@ -62,21 +62,6 @@ func HeadObject_invalid_part_number(s *S3Conf) error {
})
}
func HeadObject_part_number_not_supported(s *S3Conf) error {
testName := "HeadObject_part_number_not_supported"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
partNumber := int32(4)
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err := s3client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: &bucket,
Key: getPtr("my-obj"),
PartNumber: &partNumber,
})
cancel()
return checkSdkApiErr(err, "NotImplemented")
})
}
func HeadObject_non_existing_dir_object(s *S3Conf) error {
testName := "HeadObject_non_existing_dir_object"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
@@ -930,6 +915,202 @@ func HeadObject_overrides_presign_success(s *S3Conf) error {
})
}
func HeadObject_range_and_part_number(s *S3Conf) error {
testName := "HeadObject_range_and_part_number"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "my-obj"
_, err := putObjectWithData(100, &s3.PutObjectInput{
Bucket: &bucket,
Key: &obj,
}, s3client)
if err != nil {
return err
}
pn := int32(1)
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err = s3client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: &bucket,
Key: &obj,
Range: getPtr("bytes=0-9"),
PartNumber: &pn,
})
cancel()
return checkSdkApiErr(err, "BadRequest")
})
}
func HeadObject_mp_part_number_exceeds_parts_count(s *S3Conf) error {
testName := "HeadObject_mp_part_number_exceeds_parts_count"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "my-obj"
out, err := createMp(s3client, bucket, obj)
if err != nil {
return err
}
const partCount = int64(5)
parts, _, err := uploadParts(s3client, partCount*5*1024*1024, partCount, bucket, obj, *out.UploadId)
if err != nil {
return err
}
compParts := make([]types.CompletedPart, len(parts))
for i, p := range parts {
compParts[i] = types.CompletedPart{
ETag: p.ETag,
PartNumber: p.PartNumber,
}
}
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err = s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: &bucket,
Key: &obj,
UploadId: out.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: compParts,
},
})
cancel()
if err != nil {
return err
}
// partNumber exceeds the number of parts in the completed upload
pn := int32(partCount + 1)
ctx, cancel = context.WithTimeout(context.Background(), shortTimeout)
_, err = s3client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: &bucket,
Key: &obj,
PartNumber: &pn,
})
cancel()
return checkSdkApiErr(err, "RequestedRangeNotSatisfiable")
})
}
func HeadObject_mp_part_number_success(s *S3Conf) error {
testName := "HeadObject_mp_part_number_success"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "my-obj"
out, err := createMp(s3client, bucket, obj)
if err != nil {
return err
}
const partCount = int64(3)
const partSize = int64(5 * 1024 * 1024)
const totalSize = partCount * partSize
parts, _, err := uploadParts(s3client, totalSize, partCount, bucket, obj, *out.UploadId)
if err != nil {
return err
}
compParts := make([]types.CompletedPart, len(parts))
for i, p := range parts {
compParts[i] = types.CompletedPart{
ETag: p.ETag,
PartNumber: p.PartNumber,
}
}
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err = s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: &bucket,
Key: &obj,
UploadId: out.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: compParts,
},
})
cancel()
if err != nil {
return err
}
for i := range partCount {
pn := int32(i + 1)
startByte := i * partSize
endByte := startByte + partSize - 1
expectedContentRange := fmt.Sprintf("bytes %d-%d/%d", startByte, endByte, totalSize)
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
res, err := s3client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: &bucket,
Key: &obj,
PartNumber: &pn,
})
cancel()
if err != nil {
return fmt.Errorf("part %d: %w", pn, err)
}
if res.PartsCount == nil {
return fmt.Errorf("part %d: expected non-nil x-amz-mp-parts-count", pn)
}
if *res.PartsCount != int32(partCount) {
return fmt.Errorf("part %d: expected PartsCount %d, got %d", pn, partCount, *res.PartsCount)
}
if getString(res.ContentRange) != expectedContentRange {
return fmt.Errorf("part %d: expected Content-Range %q, got %q", pn, expectedContentRange, getString(res.ContentRange))
}
if res.ContentLength == nil || *res.ContentLength != partSize {
return fmt.Errorf("part %d: expected Content-Length %d, got %v", pn, partSize, res.ContentLength)
}
if getString(res.AcceptRanges) != "bytes" {
return fmt.Errorf("part %d: expected Accept-Ranges 'bytes', got %q", pn, getString(res.AcceptRanges))
}
}
return nil
})
}
func HeadObject_non_mp_part_number_1_success(s *S3Conf) error {
testName := "HeadObject_non_mp_part_number_1_success"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "put-object-part1"
const objSize = int64(1234)
_, err := putObjectWithData(objSize, &s3.PutObjectInput{
Bucket: &bucket,
Key: &obj,
}, s3client)
if err != nil {
return err
}
pn := int32(1)
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
res, err := s3client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: &bucket,
Key: &obj,
PartNumber: &pn,
})
cancel()
if err != nil {
return err
}
if res.ContentLength == nil || *res.ContentLength != objSize {
return fmt.Errorf("expected ContentLength %d, got %v", objSize, res.ContentLength)
}
if getString(res.ContentRange) != "" {
return fmt.Errorf("expected empty Content-Range for non-multipart object, got %q", getString(res.ContentRange))
}
if res.PartsCount != nil {
return fmt.Errorf("expected nil PartsCount for non-multipart object, got %d", *res.PartsCount)
}
if getString(res.AcceptRanges) != "bytes" {
return fmt.Errorf("expected Accept-Ranges 'bytes', got %q", getString(res.AcceptRanges))
}
return nil
})
}
func HeadObject_overrides_fail_public(s *S3Conf) error {
testName := "HeadObject_overrides_fail_public"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {

View File

@@ -201,7 +201,6 @@ func TestPutObject(ts *TestState) {
func TestHeadObject(ts *TestState) {
ts.Run(HeadObject_non_existing_object)
ts.Run(HeadObject_invalid_part_number)
ts.Run(HeadObject_part_number_not_supported)
ts.Run(HeadObject_directory_object_noslash)
ts.Run(HeadObject_non_existing_dir_object)
ts.Run(HeadObject_invalid_parent_dir)
@@ -218,6 +217,10 @@ func TestHeadObject(ts *TestState) {
ts.Run(HeadObject_overrides_success)
ts.Run(HeadObject_overrides_presign_success)
ts.Run(HeadObject_overrides_fail_public)
ts.Run(HeadObject_range_and_part_number)
ts.Run(HeadObject_mp_part_number_exceeds_parts_count)
ts.Run(HeadObject_mp_part_number_success)
ts.Run(HeadObject_non_mp_part_number_1_success)
}
func TestGetObjectAttributes(ts *TestState) {
@@ -256,7 +259,10 @@ func TestGetObject(ts *TestState) {
ts.Run(GetObject_overrides_presign_success)
ts.Run(GetObject_overrides_fail_public)
ts.Run(GetObject_invalid_part_number)
ts.Run(GetObject_part_number_not_supported)
ts.Run(GetObject_range_and_part_number)
ts.Run(GetObject_mp_part_number_exceeds_parts_count)
ts.Run(GetObject_mp_part_number_success)
ts.Run(GetObject_non_mp_part_number_1_success)
}
func TestListObjects(ts *TestState) {
@@ -537,6 +543,7 @@ func TestCompleteMultipartUpload(ts *TestState) {
ts.Run(CompleteMultipartUpload_with_metadata)
}
ts.Run(CompleteMultipartUpload_success)
ts.Run(CompleteMultipartUpload_already_completed)
if !ts.conf.azureTests {
ts.Run(CompleteMultipartUpload_racey_success)
ts.Run(CompleteMultipartUpload_racey_data_integrity)
@@ -1383,7 +1390,6 @@ func GetIntTests() IntTests {
"PutObject_racey_success": PutObject_racey_success,
"HeadObject_non_existing_object": HeadObject_non_existing_object,
"HeadObject_invalid_part_number": HeadObject_invalid_part_number,
"HeadObject_part_number_not_supported": HeadObject_part_number_not_supported,
"HeadObject_directory_object_noslash": HeadObject_directory_object_noslash,
"HeadObject_non_existing_dir_object": HeadObject_non_existing_dir_object,
"HeadObject_name_too_long": HeadObject_name_too_long,
@@ -1398,6 +1404,10 @@ func GetIntTests() IntTests {
"HeadObject_overrides_success": HeadObject_overrides_success,
"HeadObject_overrides_presign_success": HeadObject_overrides_presign_success,
"HeadObject_overrides_fail_public": HeadObject_overrides_fail_public,
"HeadObject_range_and_part_number": HeadObject_range_and_part_number,
"HeadObject_mp_part_number_exceeds_parts_count": HeadObject_mp_part_number_exceeds_parts_count,
"HeadObject_mp_part_number_success": HeadObject_mp_part_number_success,
"HeadObject_non_mp_part_number_1_success": HeadObject_non_mp_part_number_1_success,
"GetObjectAttributes_non_existing_bucket": GetObjectAttributes_non_existing_bucket,
"GetObjectAttributes_non_existing_object": GetObjectAttributes_non_existing_object,
"GetObjectAttributes_invalid_attrs": GetObjectAttributes_invalid_attrs,
@@ -1424,7 +1434,10 @@ func GetIntTests() IntTests {
"GetObject_overrides_presign_success": GetObject_overrides_presign_success,
"GetObject_overrides_fail_public": GetObject_overrides_fail_public,
"GetObject_invalid_part_number": GetObject_invalid_part_number,
"GetObject_part_number_not_supported": GetObject_part_number_not_supported,
"GetObject_range_and_part_number": GetObject_range_and_part_number,
"GetObject_mp_part_number_exceeds_parts_count": GetObject_mp_part_number_exceeds_parts_count,
"GetObject_mp_part_number_success": GetObject_mp_part_number_success,
"GetObject_non_mp_part_number_1_success": GetObject_non_mp_part_number_1_success,
"ListObjects_non_existing_bucket": ListObjects_non_existing_bucket,
"ListObjects_with_prefix": ListObjects_with_prefix,
"ListObjects_truncated": ListObjects_truncated,
@@ -1626,6 +1639,7 @@ func GetIntTests() IntTests {
"CompleteMultipartUpload_should_ignore_the_final_checksum": CompleteMultipartUpload_should_ignore_the_final_checksum,
"CompleteMultipartUpload_should_succeed_without_final_checksum_type": CompleteMultipartUpload_should_succeed_without_final_checksum_type,
"CompleteMultipartUpload_success": CompleteMultipartUpload_success,
"CompleteMultipartUpload_already_completed": CompleteMultipartUpload_already_completed,
"CompleteMultipartUpload_racey_success": CompleteMultipartUpload_racey_success,
"CompleteMultipartUpload_racey_data_integrity": CompleteMultipartUpload_racey_data_integrity,
"PutBucketAcl_non_existing_bucket": PutBucketAcl_non_existing_bucket,

View File

@@ -2700,3 +2700,12 @@ func sendPostObject(input PostRequestConfig) (*http.Response, error) {
return input.s3Conf.httpClient.Do(req)
}
func getEtagBytes(etag string) ([]byte, error) {
return hex.DecodeString(strings.ReplaceAll(etag, string('"'), ""))
}
func md5String(data []byte) string {
sum := md5.Sum(data)
return hex.EncodeToString(sum[:])
}