Merge pull request #1916 from versity/ben/azure-test-falures

azure test failure fixes
This commit is contained in:
Ben McClelland
2026-03-05 17:07:41 -08:00
committed by GitHub

View File

@@ -28,6 +28,7 @@ import (
"net/url"
"os"
"path/filepath"
"slices"
"sort"
"strconv"
"strings"
@@ -69,6 +70,9 @@ const (
onameAttr key = "Objname"
onameAttrLower key = "objname"
metaTmpMultipartPrefix key = ".sgwtmp" + "/multipart"
// keyMpZeroBytesParts tracks zero-byte upload parts in the sgwtmp metadata.
// Azure StageBlock rejects Content-Length: 0, so zero-byte parts are stored here.
keyMpZeroBytesParts key = "Zerobytesparts"
defaultListingMaxKeys = 1000
)
@@ -213,54 +217,61 @@ func (az *Azure) CreateBucket(ctx context.Context, input *s3.CreateBucketInput,
}
func (az *Azure) ListBuckets(ctx context.Context, input s3response.ListBucketsInput) (s3response.ListAllMyBucketsResult, error) {
pager := az.client.NewListContainersPager(
&service.ListContainersOptions{
Include: service.ListContainersInclude{
Metadata: true,
},
Marker: &input.ContinuationToken,
MaxResults: &input.MaxBuckets,
Prefix: &input.Prefix,
})
opts := &service.ListContainersOptions{
Include: service.ListContainersInclude{
Metadata: true,
},
Prefix: &input.Prefix,
}
pager := az.client.NewListContainersPager(opts)
var buckets []s3response.ListAllMyBucketsEntry
var cToken string
result := s3response.ListAllMyBucketsResult{
Prefix: input.Prefix,
}
resp, err := pager.NextPage(ctx)
if err != nil {
return result, azureErrToS3Err(err)
}
for _, v := range resp.ContainerItems {
if input.IsAdmin {
buckets = append(buckets, s3response.ListAllMyBucketsEntry{
Name: *v.Name,
// TODO: using modification date here instead of creation, is that ok?
CreationDate: *v.Properties.LastModified,
})
} else {
acl, err := getAclFromMetadata(v.Metadata, keyAclLower)
if err != nil {
return result, err
outer:
for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
return result, azureErrToS3Err(err)
}
for _, v := range resp.ContainerItems {
// If we've already filled MaxBuckets, there is a next page — set token and stop
if input.MaxBuckets > 0 && int32(len(buckets)) == input.MaxBuckets {
cToken = buckets[len(buckets)-1].Name
break outer
}
if acl.Owner == input.Owner {
// Skip items at or before the continuation token (client-side "start after")
if input.ContinuationToken != "" && *v.Name <= input.ContinuationToken {
continue
}
if input.IsAdmin {
buckets = append(buckets, s3response.ListAllMyBucketsEntry{
Name: *v.Name,
// TODO: using modification date here instead of creation, is that ok?
CreationDate: *v.Properties.LastModified,
})
} else {
acl, err := getAclFromMetadata(v.Metadata, keyAclLower)
if err != nil {
return result, err
}
if acl.Owner == input.Owner {
buckets = append(buckets, s3response.ListAllMyBucketsEntry{
Name: *v.Name,
// TODO: using modification date here instead of creation, is that ok?
CreationDate: *v.Properties.LastModified,
})
}
}
}
}
if resp.NextMarker != nil {
result.ContinuationToken = *resp.NextMarker
}
result.Buckets.Bucket = buckets
result.Owner.ID = input.Owner
result.ContinuationToken = cToken
return result, nil
}
@@ -575,7 +586,8 @@ func (az *Azure) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3
retention, ok := resp.Metadata[string(keyObjRetention)]
if ok {
var config types.ObjectLockRetention
if err := json.Unmarshal([]byte(*retention), &config); err == nil {
err := json.Unmarshal([]byte(*retention), &config)
if err == nil {
result.ObjectLockMode = types.ObjectLockMode(config.Mode)
result.ObjectLockRetainUntilDate = config.RetainUntilDate
}
@@ -1031,6 +1043,7 @@ func (az *Azure) CopyObject(ctx context.Context, input s3response.CopyObjectInpu
if err != nil {
return s3response.CopyObjectOutput{}, azureErrToS3Err(err)
}
defer downloadResp.Body.Close()
pInput := s3response.PutObjectInput{
Body: downloadResp.Body,
@@ -1144,16 +1157,20 @@ func (az *Azure) CreateMultipartUpload(ctx context.Context, input s3response.Cre
}
if len(bucketLock) == 0 {
return s3response.InitiateMultipartUploadResult{}, s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces)
return s3response.InitiateMultipartUploadResult{},
s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces)
}
var bucketLockConfig auth.BucketLockConfig
if err := json.Unmarshal(bucketLock, &bucketLockConfig); err != nil {
return s3response.InitiateMultipartUploadResult{}, fmt.Errorf("parse bucket lock config: %w", err)
err = json.Unmarshal(bucketLock, &bucketLockConfig)
if err != nil {
return s3response.InitiateMultipartUploadResult{},
fmt.Errorf("parse bucket lock config: %w", err)
}
if !bucketLockConfig.Enabled {
return s3response.InitiateMultipartUploadResult{}, s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces)
return s3response.InitiateMultipartUploadResult{},
s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces)
}
}
@@ -1221,7 +1238,8 @@ func (az *Azure) CreateMultipartUpload(ctx context.Context, input s3response.Cre
// Each part is translated into an uncommitted block in a newly created blob in staging area
func (az *Azure) UploadPart(ctx context.Context, input *s3.UploadPartInput) (*s3.UploadPartOutput, error) {
if err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId); err != nil {
err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId)
if err != nil {
return nil, err
}
@@ -1234,13 +1252,31 @@ func (az *Azure) UploadPart(ctx context.Context, input *s3.UploadPartInput) (*s3
return nil, err
}
// block id serves as etag here
etag := blockIDInt32ToBase64(*input.PartNumber)
// Azure StageBlock rejects Content-Length: 0 as an invalid header value.
// Track zero-byte parts in the sgwtmp metadata instead of staging them.
size, err := rdr.Seek(0, io.SeekEnd)
if err != nil {
return nil, err
}
if _, err = rdr.Seek(0, io.SeekStart); err != nil {
return nil, err
}
if size == 0 {
err := az.trackZeroBytePart(ctx, *input.Bucket, *input.Key, *input.UploadId, *input.PartNumber)
if err != nil {
return nil, err
}
return &s3.UploadPartOutput{ETag: &etag}, nil
}
client, err := az.getBlockBlobClient(*input.Bucket, *input.Key)
if err != nil {
return nil, err
}
// block id serves as etag here
etag := blockIDInt32ToBase64(*input.PartNumber)
_, err = client.StageBlock(ctx, etag, rdr, nil)
if err != nil {
return nil, parseMpError(err)
@@ -1257,7 +1293,8 @@ func (az *Azure) UploadPartCopy(ctx context.Context, input *s3.UploadPartCopyInp
return s3response.CopyPartResult{}, err
}
if err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId); err != nil {
err = az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId)
if err != nil {
return s3response.CopyPartResult{}, err
}
@@ -1274,7 +1311,8 @@ func (az *Azure) UploadPartCopy(ctx context.Context, input *s3.UploadPartCopyInp
// Lists all uncommitted parts from the blob
func (az *Azure) ListParts(ctx context.Context, input *s3.ListPartsInput) (s3response.ListPartsResult, error) {
if err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId); err != nil {
err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId)
if err != nil {
return s3response.ListPartsResult{}, err
}
client, err := az.getBlockBlobClient(*input.Bucket, *input.Key)
@@ -1290,49 +1328,66 @@ func (az *Azure) ListParts(ctx context.Context, input *s3.ListPartsInput) (s3res
if *input.PartNumberMarker != "" {
partNumberMarker, err = strconv.Atoi(*input.PartNumberMarker)
if err != nil {
return s3response.ListPartsResult{}, s3err.GetInvalidMaxLimiterErr("part-number-marker")
return s3response.ListPartsResult{},
s3err.GetInvalidMaxLimiterErr("part-number-marker")
}
}
if input.MaxParts != nil {
maxParts = *input.MaxParts
}
resp, err := client.GetBlockList(ctx, blockblob.BlockListTypeUncommitted, nil)
if err != nil {
// If the mp exists but the client returns 'NoSuchKey' error, return empty result
if errors.Is(azureErrToS3Err(err), s3err.GetAPIError(s3err.ErrNoSuchKey)) {
return s3response.ListPartsResult{
Bucket: *input.Bucket,
Key: *input.Key,
PartNumberMarker: partNumberMarker,
IsTruncated: isTruncated,
MaxParts: int(maxParts),
StorageClass: types.StorageClassStandard,
}, nil
resp, blockListErr := client.GetBlockList(ctx, blockblob.BlockListTypeUncommitted, nil)
if blockListErr != nil {
if !errors.Is(azureErrToS3Err(blockListErr), s3err.GetAPIError(s3err.ErrNoSuchKey)) {
return s3response.ListPartsResult{}, blockListErr
}
// NoSuchKey means no blocks have been staged yet (possible if only zero-byte
// parts exist). Continue so we can still return those from metadata.
}
parts := []s3response.Part{}
for _, el := range resp.UncommittedBlocks {
partNumber, err := decodeBlockId(*el.Name)
if err != nil {
return s3response.ListPartsResult{}, err
if blockListErr == nil {
for _, el := range resp.UncommittedBlocks {
partNumber, err := decodeBlockId(*el.Name)
if err != nil {
return s3response.ListPartsResult{}, err
}
if partNumberMarker >= partNumber {
continue
}
parts = append(parts, s3response.Part{
Size: *el.Size,
ETag: *el.Name,
PartNumber: partNumber,
LastModified: time.Now(),
})
}
if partNumberMarker >= partNumber {
}
// Merge in zero-byte parts tracked in the sgwtmp metadata.
zbParts, _ := az.getZeroByteParts(ctx, *input.Bucket, *input.Key, *input.UploadId)
for _, zbPartNum := range zbParts {
if partNumberMarker >= int(zbPartNum) {
continue
}
parts = append(parts, s3response.Part{
Size: *el.Size,
ETag: *el.Name,
PartNumber: partNumber,
Size: 0,
ETag: blockIDInt32ToBase64(zbPartNum),
PartNumber: int(zbPartNum),
LastModified: time.Now(),
})
if len(parts) >= int(maxParts) {
nextPartNumberMarker = partNumber
isTruncated = true
break
}
}
// Sort by part number and apply maxParts limit.
sort.Slice(parts, func(i, j int) bool {
return parts[i].PartNumber < parts[j].PartNumber
})
if int32(len(parts)) > maxParts {
parts = parts[:maxParts]
nextPartNumberMarker = parts[len(parts)-1].PartNumber
isTruncated = true
}
return s3response.ListPartsResult{
Bucket: *input.Bucket,
Key: *input.Key,
@@ -1515,10 +1570,23 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
blockList, err := client.GetBlockList(ctx, blockblob.BlockListTypeUncommitted, nil)
if err != nil {
return res, "", azureErrToS3Err(err)
if !errors.Is(azureErrToS3Err(err), s3err.GetAPIError(s3err.ErrNoSuchKey)) {
return res, "", azureErrToS3Err(err)
}
// NoSuchKey: no blocks staged; only zero-byte parts may exist.
}
if len(blockList.UncommittedBlocks) != len(input.MultipartUpload.Parts) {
// Collect zero-byte parts tracked in the sgwtmp metadata.
zbParts, err := az.getZeroByteParts(ctx, *input.Bucket, *input.Key, *input.UploadId)
if err != nil {
return res, "", err
}
zbPartsMap := make(map[int32]bool, len(zbParts))
for _, p := range zbParts {
zbPartsMap[p] = true
}
if len(blockList.UncommittedBlocks)+len(zbParts) != len(input.MultipartUpload.Parts) {
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
@@ -1532,10 +1600,10 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
uncommittedBlocks[int32(ptNumber)] = el
}
// The initialie values is the lower limit of partNumber: 0
// The initial value is the lower limit of partNumber: 0
var totalSize int64
var partNumber int32
last := len(blockList.UncommittedBlocks) - 1
last := len(input.MultipartUpload.Parts) - 1
for i, part := range input.MultipartUpload.Parts {
if part.PartNumber == nil {
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
@@ -1550,6 +1618,19 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
block, ok := uncommittedBlocks[*part.PartNumber]
if !ok {
// Check if this is a tracked zero-byte part.
if zbPartsMap[*part.PartNumber] {
expectedETag := blockIDInt32ToBase64(*part.PartNumber)
if getString(part.ETag) != expectedETag {
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
// Non-last zero-byte parts violate the minimum part size.
if i < last {
return res, "", s3err.GetAPIError(s3err.ErrEntityTooSmall)
}
// Zero-byte parts contribute no data; skip adding to blockIds.
continue
}
return res, "", s3err.GetAPIError(s3err.ErrInvalidPart)
}
@@ -1566,9 +1647,13 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
}
if input.MpuObjectSize != nil && totalSize != *input.MpuObjectSize {
return res, "", s3err.GetIncorrectMpObjectSizeErr(totalSize, *input.MpuObjectSize)
return res, "",
s3err.GetIncorrectMpObjectSizeErr(totalSize, *input.MpuObjectSize)
}
// Remove internal tracking keys from metadata before storing on the final blob.
delete(props.Metadata, string(keyMpZeroBytesParts))
opts := &blockblob.CommitBlockListOptions{
Metadata: props.Metadata,
Tags: parseAzTags(tags.BlobTagSet),
@@ -1834,7 +1919,8 @@ func (az *Azure) isBucketObjectLockEnabled(ctx context.Context, bucket string) e
}
var bucketLockConfig auth.BucketLockConfig
if err := json.Unmarshal(cfg, &bucketLockConfig); err != nil {
err = json.Unmarshal(cfg, &bucketLockConfig)
if err != nil {
return fmt.Errorf("parse bucket lock config: %w", err)
}
@@ -2119,6 +2205,77 @@ func createMetaTmpPath(obj, uploadId string) string {
return filepath.Join(string(metaTmpMultipartPrefix), uploadId, fmt.Sprintf("%x", objNameSum))
}
// trackZeroBytePart records a zero-byte upload part in the sgwtmp metadata.
// Azure StageBlock rejects Content-Length: 0, so zero-byte parts are stored here.
func (az *Azure) trackZeroBytePart(ctx context.Context, bucket, key, uploadId string, partNumber int32) error {
tmpPath := createMetaTmpPath(key, uploadId)
blobClient, err := az.getBlobClient(bucket, tmpPath)
if err != nil {
return err
}
props, err := blobClient.GetProperties(ctx, nil)
if err != nil {
return azureErrToS3Err(err)
}
meta := props.Metadata
if meta == nil {
meta = map[string]*string{}
}
// Deduplicate: replace an existing entry for the same partNumber.
parts := parseZeroByteParts(meta)
found := slices.Contains(parts, partNumber)
if !found {
parts = append(parts, partNumber)
}
serialized := serializeZeroByteParts(parts)
meta[string(keyMpZeroBytesParts)] = &serialized
_, err = blobClient.SetMetadata(ctx, meta, nil)
return azureErrToS3Err(err)
}
// getZeroByteParts returns the list of zero-byte parts tracked in the sgwtmp metadata.
func (az *Azure) getZeroByteParts(ctx context.Context, bucket, key, uploadId string) ([]int32, error) {
tmpPath := createMetaTmpPath(key, uploadId)
blobClient, err := az.getBlobClient(bucket, tmpPath)
if err != nil {
return nil, err
}
props, err := blobClient.GetProperties(ctx, nil)
if err != nil {
return nil, azureErrToS3Err(err)
}
return parseZeroByteParts(props.Metadata), nil
}
func parseZeroByteParts(meta map[string]*string) []int32 {
val, ok := meta[string(keyMpZeroBytesParts)]
if !ok || val == nil || *val == "" {
return nil
}
var parts []int32
for s := range strings.SplitSeq(*val, ",") {
n, err := strconv.ParseInt(strings.TrimSpace(s), 10, 32)
if err == nil {
parts = append(parts, int32(n))
}
}
return parts
}
func serializeZeroByteParts(parts []int32) string {
strs := make([]string, len(parts))
for i, p := range parts {
strs[i] = strconv.Itoa(int(p))
}
return strings.Join(strs, ",")
}
// Checks if the multipart upload existis with the given bucket, key and uploadId
func (az *Azure) checkIfMpExists(ctx context.Context, bucket, obj, uploadId string) error {
tmpPath := createMetaTmpPath(obj, uploadId)