From 929048cbee0dbc963a8582a0484cd9868dafb8f3 Mon Sep 17 00:00:00 2001 From: Ben McClelland Date: Sat, 28 Feb 2026 14:49:48 -0800 Subject: [PATCH 1/4] fix: azure PresignedAuth_UploadPart test failure Azure Storage's StageBlock REST API rejects Content-Length: 0 with InvalidHeaderValue. The tests (PresignedAuth_UploadPart, UploadPart_success) upload a nil/empty body, which causes the Azure SDK to send Content-Length: 0. Azurite is lenient and accepts it; real Azure Storage does not. Use a new metadata key ("Zerobytesparts") sett on the .sgwtmp/multipart// blob to track and 0 length parts. --- backend/azure/azure.go | 238 +++++++++++++++++++++++++++++++++-------- 1 file changed, 196 insertions(+), 42 deletions(-) diff --git a/backend/azure/azure.go b/backend/azure/azure.go index a454a36b..bb6f420a 100644 --- a/backend/azure/azure.go +++ b/backend/azure/azure.go @@ -69,6 +69,9 @@ const ( onameAttr key = "Objname" onameAttrLower key = "objname" metaTmpMultipartPrefix key = ".sgwtmp" + "/multipart" + // keyMpZeroBytesParts tracks zero-byte upload parts in the sgwtmp metadata. + // Azure StageBlock rejects Content-Length: 0, so zero-byte parts are stored here. + keyMpZeroBytesParts key = "Zerobytesparts" defaultListingMaxKeys = 1000 ) @@ -575,7 +578,8 @@ func (az *Azure) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3 retention, ok := resp.Metadata[string(keyObjRetention)] if ok { var config types.ObjectLockRetention - if err := json.Unmarshal([]byte(*retention), &config); err == nil { + err := json.Unmarshal([]byte(*retention), &config) + if err == nil { result.ObjectLockMode = types.ObjectLockMode(config.Mode) result.ObjectLockRetainUntilDate = config.RetainUntilDate } @@ -1144,16 +1148,20 @@ func (az *Azure) CreateMultipartUpload(ctx context.Context, input s3response.Cre } if len(bucketLock) == 0 { - return s3response.InitiateMultipartUploadResult{}, s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces) + return s3response.InitiateMultipartUploadResult{}, + s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces) } var bucketLockConfig auth.BucketLockConfig - if err := json.Unmarshal(bucketLock, &bucketLockConfig); err != nil { - return s3response.InitiateMultipartUploadResult{}, fmt.Errorf("parse bucket lock config: %w", err) + err = json.Unmarshal(bucketLock, &bucketLockConfig) + if err != nil { + return s3response.InitiateMultipartUploadResult{}, + fmt.Errorf("parse bucket lock config: %w", err) } if !bucketLockConfig.Enabled { - return s3response.InitiateMultipartUploadResult{}, s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces) + return s3response.InitiateMultipartUploadResult{}, + s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces) } } @@ -1221,7 +1229,8 @@ func (az *Azure) CreateMultipartUpload(ctx context.Context, input s3response.Cre // Each part is translated into an uncommitted block in a newly created blob in staging area func (az *Azure) UploadPart(ctx context.Context, input *s3.UploadPartInput) (*s3.UploadPartOutput, error) { - if err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId); err != nil { + err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId) + if err != nil { return nil, err } @@ -1234,13 +1243,31 @@ func (az *Azure) UploadPart(ctx context.Context, input *s3.UploadPartInput) (*s3 return nil, err } + // block id serves as etag here + etag := blockIDInt32ToBase64(*input.PartNumber) + + // Azure StageBlock rejects Content-Length: 0 as an invalid header value. + // Track zero-byte parts in the sgwtmp metadata instead of staging them. + size, err := rdr.Seek(0, io.SeekEnd) + if err != nil { + return nil, err + } + if _, err = rdr.Seek(0, io.SeekStart); err != nil { + return nil, err + } + if size == 0 { + err := az.trackZeroBytePart(ctx, *input.Bucket, *input.Key, *input.UploadId, *input.PartNumber) + if err != nil { + return nil, err + } + return &s3.UploadPartOutput{ETag: &etag}, nil + } + client, err := az.getBlockBlobClient(*input.Bucket, *input.Key) if err != nil { return nil, err } - // block id serves as etag here - etag := blockIDInt32ToBase64(*input.PartNumber) _, err = client.StageBlock(ctx, etag, rdr, nil) if err != nil { return nil, parseMpError(err) @@ -1257,7 +1284,8 @@ func (az *Azure) UploadPartCopy(ctx context.Context, input *s3.UploadPartCopyInp return s3response.CopyPartResult{}, err } - if err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId); err != nil { + err = az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId) + if err != nil { return s3response.CopyPartResult{}, err } @@ -1274,7 +1302,8 @@ func (az *Azure) UploadPartCopy(ctx context.Context, input *s3.UploadPartCopyInp // Lists all uncommitted parts from the blob func (az *Azure) ListParts(ctx context.Context, input *s3.ListPartsInput) (s3response.ListPartsResult, error) { - if err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId); err != nil { + err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId) + if err != nil { return s3response.ListPartsResult{}, err } client, err := az.getBlockBlobClient(*input.Bucket, *input.Key) @@ -1290,49 +1319,66 @@ func (az *Azure) ListParts(ctx context.Context, input *s3.ListPartsInput) (s3res if *input.PartNumberMarker != "" { partNumberMarker, err = strconv.Atoi(*input.PartNumberMarker) if err != nil { - return s3response.ListPartsResult{}, s3err.GetInvalidMaxLimiterErr("part-number-marker") + return s3response.ListPartsResult{}, + s3err.GetInvalidMaxLimiterErr("part-number-marker") } } if input.MaxParts != nil { maxParts = *input.MaxParts } - resp, err := client.GetBlockList(ctx, blockblob.BlockListTypeUncommitted, nil) - if err != nil { - // If the mp exists but the client returns 'NoSuchKey' error, return empty result - if errors.Is(azureErrToS3Err(err), s3err.GetAPIError(s3err.ErrNoSuchKey)) { - return s3response.ListPartsResult{ - Bucket: *input.Bucket, - Key: *input.Key, - PartNumberMarker: partNumberMarker, - IsTruncated: isTruncated, - MaxParts: int(maxParts), - StorageClass: types.StorageClassStandard, - }, nil + resp, blockListErr := client.GetBlockList(ctx, blockblob.BlockListTypeUncommitted, nil) + if blockListErr != nil { + if !errors.Is(azureErrToS3Err(blockListErr), s3err.GetAPIError(s3err.ErrNoSuchKey)) { + return s3response.ListPartsResult{}, blockListErr } + // NoSuchKey means no blocks have been staged yet (possible if only zero-byte + // parts exist). Continue so we can still return those from metadata. } parts := []s3response.Part{} - for _, el := range resp.UncommittedBlocks { - partNumber, err := decodeBlockId(*el.Name) - if err != nil { - return s3response.ListPartsResult{}, err + if blockListErr == nil { + for _, el := range resp.UncommittedBlocks { + partNumber, err := decodeBlockId(*el.Name) + if err != nil { + return s3response.ListPartsResult{}, err + } + if partNumberMarker >= partNumber { + continue + } + parts = append(parts, s3response.Part{ + Size: *el.Size, + ETag: *el.Name, + PartNumber: partNumber, + LastModified: time.Now(), + }) } - if partNumberMarker >= partNumber { + } + + // Merge in zero-byte parts tracked in the sgwtmp metadata. + zbParts, _ := az.getZeroByteParts(ctx, *input.Bucket, *input.Key, *input.UploadId) + for _, zbPartNum := range zbParts { + if partNumberMarker >= int(zbPartNum) { continue } parts = append(parts, s3response.Part{ - Size: *el.Size, - ETag: *el.Name, - PartNumber: partNumber, + Size: 0, + ETag: blockIDInt32ToBase64(zbPartNum), + PartNumber: int(zbPartNum), LastModified: time.Now(), }) - if len(parts) >= int(maxParts) { - nextPartNumberMarker = partNumber - isTruncated = true - break - } } + + // Sort by part number and apply maxParts limit. + sort.Slice(parts, func(i, j int) bool { + return parts[i].PartNumber < parts[j].PartNumber + }) + if int32(len(parts)) > maxParts { + parts = parts[:maxParts] + nextPartNumberMarker = parts[len(parts)-1].PartNumber + isTruncated = true + } + return s3response.ListPartsResult{ Bucket: *input.Bucket, Key: *input.Key, @@ -1515,10 +1561,23 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete blockList, err := client.GetBlockList(ctx, blockblob.BlockListTypeUncommitted, nil) if err != nil { - return res, "", azureErrToS3Err(err) + if !errors.Is(azureErrToS3Err(err), s3err.GetAPIError(s3err.ErrNoSuchKey)) { + return res, "", azureErrToS3Err(err) + } + // NoSuchKey: no blocks staged; only zero-byte parts may exist. } - if len(blockList.UncommittedBlocks) != len(input.MultipartUpload.Parts) { + // Collect zero-byte parts tracked in the sgwtmp metadata. + zbParts, err := az.getZeroByteParts(ctx, *input.Bucket, *input.Key, *input.UploadId) + if err != nil { + return res, "", err + } + zbPartsMap := make(map[int32]bool, len(zbParts)) + for _, p := range zbParts { + zbPartsMap[p] = true + } + + if len(blockList.UncommittedBlocks)+len(zbParts) != len(input.MultipartUpload.Parts) { return res, "", s3err.GetAPIError(s3err.ErrInvalidPart) } @@ -1532,10 +1591,10 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete uncommittedBlocks[int32(ptNumber)] = el } - // The initialie values is the lower limit of partNumber: 0 + // The initial value is the lower limit of partNumber: 0 var totalSize int64 var partNumber int32 - last := len(blockList.UncommittedBlocks) - 1 + last := len(input.MultipartUpload.Parts) - 1 for i, part := range input.MultipartUpload.Parts { if part.PartNumber == nil { return res, "", s3err.GetAPIError(s3err.ErrInvalidPart) @@ -1550,6 +1609,19 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete block, ok := uncommittedBlocks[*part.PartNumber] if !ok { + // Check if this is a tracked zero-byte part. + if zbPartsMap[*part.PartNumber] { + expectedETag := blockIDInt32ToBase64(*part.PartNumber) + if getString(part.ETag) != expectedETag { + return res, "", s3err.GetAPIError(s3err.ErrInvalidPart) + } + // Non-last zero-byte parts violate the minimum part size. + if i < last { + return res, "", s3err.GetAPIError(s3err.ErrEntityTooSmall) + } + // Zero-byte parts contribute no data; skip adding to blockIds. + continue + } return res, "", s3err.GetAPIError(s3err.ErrInvalidPart) } @@ -1566,9 +1638,13 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete } if input.MpuObjectSize != nil && totalSize != *input.MpuObjectSize { - return res, "", s3err.GetIncorrectMpObjectSizeErr(totalSize, *input.MpuObjectSize) + return res, "", + s3err.GetIncorrectMpObjectSizeErr(totalSize, *input.MpuObjectSize) } + // Remove internal tracking keys from metadata before storing on the final blob. + delete(props.Metadata, string(keyMpZeroBytesParts)) + opts := &blockblob.CommitBlockListOptions{ Metadata: props.Metadata, Tags: parseAzTags(tags.BlobTagSet), @@ -1834,7 +1910,8 @@ func (az *Azure) isBucketObjectLockEnabled(ctx context.Context, bucket string) e } var bucketLockConfig auth.BucketLockConfig - if err := json.Unmarshal(cfg, &bucketLockConfig); err != nil { + err = json.Unmarshal(cfg, &bucketLockConfig) + if err != nil { return fmt.Errorf("parse bucket lock config: %w", err) } @@ -2119,6 +2196,83 @@ func createMetaTmpPath(obj, uploadId string) string { return filepath.Join(string(metaTmpMultipartPrefix), uploadId, fmt.Sprintf("%x", objNameSum)) } +// trackZeroBytePart records a zero-byte upload part in the sgwtmp metadata. +// Azure StageBlock rejects Content-Length: 0, so zero-byte parts are stored here. +func (az *Azure) trackZeroBytePart(ctx context.Context, bucket, key, uploadId string, partNumber int32) error { + tmpPath := createMetaTmpPath(key, uploadId) + blobClient, err := az.getBlobClient(bucket, tmpPath) + if err != nil { + return err + } + + props, err := blobClient.GetProperties(ctx, nil) + if err != nil { + return azureErrToS3Err(err) + } + + meta := props.Metadata + if meta == nil { + meta = map[string]*string{} + } + + // Deduplicate: replace an existing entry for the same partNumber. + parts := parseZeroByteParts(meta) + found := false + for _, p := range parts { + if p == partNumber { + found = true + break + } + } + if !found { + parts = append(parts, partNumber) + } + + serialized := serializeZeroByteParts(parts) + meta[string(keyMpZeroBytesParts)] = &serialized + _, err = blobClient.SetMetadata(ctx, meta, nil) + return azureErrToS3Err(err) +} + +// getZeroByteParts returns the list of zero-byte parts tracked in the sgwtmp metadata. +func (az *Azure) getZeroByteParts(ctx context.Context, bucket, key, uploadId string) ([]int32, error) { + tmpPath := createMetaTmpPath(key, uploadId) + blobClient, err := az.getBlobClient(bucket, tmpPath) + if err != nil { + return nil, err + } + + props, err := blobClient.GetProperties(ctx, nil) + if err != nil { + return nil, azureErrToS3Err(err) + } + + return parseZeroByteParts(props.Metadata), nil +} + +func parseZeroByteParts(meta map[string]*string) []int32 { + val, ok := meta[string(keyMpZeroBytesParts)] + if !ok || val == nil || *val == "" { + return nil + } + var parts []int32 + for s := range strings.SplitSeq(*val, ",") { + n, err := strconv.ParseInt(strings.TrimSpace(s), 10, 32) + if err == nil { + parts = append(parts, int32(n)) + } + } + return parts +} + +func serializeZeroByteParts(parts []int32) string { + strs := make([]string, len(parts)) + for i, p := range parts { + strs[i] = strconv.Itoa(int(p)) + } + return strings.Join(strs, ",") +} + // Checks if the multipart upload existis with the given bucket, key and uploadId func (az *Azure) checkIfMpExists(ctx context.Context, bucket, obj, uploadId string) error { tmpPath := createMetaTmpPath(obj, uploadId) From dc31696e5314554fa7e205696f680460bb7f2ca2 Mon Sep 17 00:00:00 2001 From: Ben McClelland Date: Sat, 28 Feb 2026 15:37:59 -0800 Subject: [PATCH 2/4] fix: azure ListBuckets pagination to use client-side continuation tokens Azure's ListContainers Marker parameter requires an opaque internal token (e.g. /accountname/containername) rather than a plain container name, so passing MaxResults and our ContinuationToken directly to the Azure API caused 400 OutOfRangeInput errors. Rework ListBuckets to iterate all Azure pages client-side, skip entries at or before the ContinuationToken (matching the posix backend's "start after" semantics), and stop once MaxBuckets items have been collected, setting ContinuationToken to the last returned bucket name. This avoids using Azure's NextMarker entirely and correctly handles both unpaginated and paginated requests. --- backend/azure/azure.go | 67 +++++++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 30 deletions(-) diff --git a/backend/azure/azure.go b/backend/azure/azure.go index bb6f420a..ecaaf91e 100644 --- a/backend/azure/azure.go +++ b/backend/azure/azure.go @@ -216,54 +216,61 @@ func (az *Azure) CreateBucket(ctx context.Context, input *s3.CreateBucketInput, } func (az *Azure) ListBuckets(ctx context.Context, input s3response.ListBucketsInput) (s3response.ListAllMyBucketsResult, error) { - pager := az.client.NewListContainersPager( - &service.ListContainersOptions{ - Include: service.ListContainersInclude{ - Metadata: true, - }, - Marker: &input.ContinuationToken, - MaxResults: &input.MaxBuckets, - Prefix: &input.Prefix, - }) + opts := &service.ListContainersOptions{ + Include: service.ListContainersInclude{ + Metadata: true, + }, + Prefix: &input.Prefix, + } + pager := az.client.NewListContainersPager(opts) var buckets []s3response.ListAllMyBucketsEntry + var cToken string result := s3response.ListAllMyBucketsResult{ Prefix: input.Prefix, } - resp, err := pager.NextPage(ctx) - if err != nil { - return result, azureErrToS3Err(err) - } - for _, v := range resp.ContainerItems { - if input.IsAdmin { - buckets = append(buckets, s3response.ListAllMyBucketsEntry{ - Name: *v.Name, - // TODO: using modification date here instead of creation, is that ok? - CreationDate: *v.Properties.LastModified, - }) - } else { - acl, err := getAclFromMetadata(v.Metadata, keyAclLower) - if err != nil { - return result, err +outer: + for pager.More() { + resp, err := pager.NextPage(ctx) + if err != nil { + return result, azureErrToS3Err(err) + } + for _, v := range resp.ContainerItems { + // If we've already filled MaxBuckets, there is a next page — set token and stop + if input.MaxBuckets > 0 && int32(len(buckets)) == input.MaxBuckets { + cToken = buckets[len(buckets)-1].Name + break outer } - - if acl.Owner == input.Owner { + // Skip items at or before the continuation token (client-side "start after") + if input.ContinuationToken != "" && *v.Name <= input.ContinuationToken { + continue + } + if input.IsAdmin { buckets = append(buckets, s3response.ListAllMyBucketsEntry{ Name: *v.Name, // TODO: using modification date here instead of creation, is that ok? CreationDate: *v.Properties.LastModified, }) + } else { + acl, err := getAclFromMetadata(v.Metadata, keyAclLower) + if err != nil { + return result, err + } + if acl.Owner == input.Owner { + buckets = append(buckets, s3response.ListAllMyBucketsEntry{ + Name: *v.Name, + // TODO: using modification date here instead of creation, is that ok? + CreationDate: *v.Properties.LastModified, + }) + } } } } - if resp.NextMarker != nil { - result.ContinuationToken = *resp.NextMarker - } - result.Buckets.Bucket = buckets result.Owner.ID = input.Owner + result.ContinuationToken = cToken return result, nil } From 271313b03618fc15515bdeef3f9f4b130a6ffa1e Mon Sep 17 00:00:00 2001 From: Ben McClelland Date: Sat, 28 Feb 2026 16:04:37 -0800 Subject: [PATCH 3/4] fix: azure close download body in CopyObject to prevent resource leak When copying between two different Azure blobs, the source download stream body was only consumed by PutObject but never explicitly closed. If PutObject or any subsequent step returned an error, the underlying HTTP connection held by the Azure SDK was never released, leaking both the connection and any internal SDK retry goroutines attached to it. Added a deferred close on downloadResp.Body immediately after the successful DownloadStream call to ensure the body is always drained and released regardless of the outcome. --- backend/azure/azure.go | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/azure/azure.go b/backend/azure/azure.go index ecaaf91e..2efb015f 100644 --- a/backend/azure/azure.go +++ b/backend/azure/azure.go @@ -1042,6 +1042,7 @@ func (az *Azure) CopyObject(ctx context.Context, input s3response.CopyObjectInpu if err != nil { return s3response.CopyObjectOutput{}, azureErrToS3Err(err) } + defer downloadResp.Body.Close() pInput := s3response.PutObjectInput{ Body: downloadResp.Body, From afbeb7cb6ed11fea532ee21943440759daf41151 Mon Sep 17 00:00:00 2001 From: Ben McClelland Date: Sat, 28 Feb 2026 16:14:46 -0800 Subject: [PATCH 4/4] fix: azure modernize part number loop check The part number loop check can be simplified with slices.Contains. --- backend/azure/azure.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/backend/azure/azure.go b/backend/azure/azure.go index 2efb015f..e32e3c71 100644 --- a/backend/azure/azure.go +++ b/backend/azure/azure.go @@ -28,6 +28,7 @@ import ( "net/url" "os" "path/filepath" + "slices" "sort" "strconv" "strings" @@ -2225,13 +2226,7 @@ func (az *Azure) trackZeroBytePart(ctx context.Context, bucket, key, uploadId st // Deduplicate: replace an existing entry for the same partNumber. parts := parseZeroByteParts(meta) - found := false - for _, p := range parts { - if p == partNumber { - found = true - break - } - } + found := slices.Contains(parts, partNumber) if !found { parts = append(parts, partNumber) }