diff --git a/backend/azure/azure.go b/backend/azure/azure.go index a454a36..e32e3c7 100644 --- a/backend/azure/azure.go +++ b/backend/azure/azure.go @@ -28,6 +28,7 @@ import ( "net/url" "os" "path/filepath" + "slices" "sort" "strconv" "strings" @@ -69,6 +70,9 @@ const ( onameAttr key = "Objname" onameAttrLower key = "objname" metaTmpMultipartPrefix key = ".sgwtmp" + "/multipart" + // keyMpZeroBytesParts tracks zero-byte upload parts in the sgwtmp metadata. + // Azure StageBlock rejects Content-Length: 0, so zero-byte parts are stored here. + keyMpZeroBytesParts key = "Zerobytesparts" defaultListingMaxKeys = 1000 ) @@ -213,54 +217,61 @@ func (az *Azure) CreateBucket(ctx context.Context, input *s3.CreateBucketInput, } func (az *Azure) ListBuckets(ctx context.Context, input s3response.ListBucketsInput) (s3response.ListAllMyBucketsResult, error) { - pager := az.client.NewListContainersPager( - &service.ListContainersOptions{ - Include: service.ListContainersInclude{ - Metadata: true, - }, - Marker: &input.ContinuationToken, - MaxResults: &input.MaxBuckets, - Prefix: &input.Prefix, - }) + opts := &service.ListContainersOptions{ + Include: service.ListContainersInclude{ + Metadata: true, + }, + Prefix: &input.Prefix, + } + pager := az.client.NewListContainersPager(opts) var buckets []s3response.ListAllMyBucketsEntry + var cToken string result := s3response.ListAllMyBucketsResult{ Prefix: input.Prefix, } - resp, err := pager.NextPage(ctx) - if err != nil { - return result, azureErrToS3Err(err) - } - for _, v := range resp.ContainerItems { - if input.IsAdmin { - buckets = append(buckets, s3response.ListAllMyBucketsEntry{ - Name: *v.Name, - // TODO: using modification date here instead of creation, is that ok? - CreationDate: *v.Properties.LastModified, - }) - } else { - acl, err := getAclFromMetadata(v.Metadata, keyAclLower) - if err != nil { - return result, err +outer: + for pager.More() { + resp, err := pager.NextPage(ctx) + if err != nil { + return result, azureErrToS3Err(err) + } + for _, v := range resp.ContainerItems { + // If we've already filled MaxBuckets, there is a next page — set token and stop + if input.MaxBuckets > 0 && int32(len(buckets)) == input.MaxBuckets { + cToken = buckets[len(buckets)-1].Name + break outer } - - if acl.Owner == input.Owner { + // Skip items at or before the continuation token (client-side "start after") + if input.ContinuationToken != "" && *v.Name <= input.ContinuationToken { + continue + } + if input.IsAdmin { buckets = append(buckets, s3response.ListAllMyBucketsEntry{ Name: *v.Name, // TODO: using modification date here instead of creation, is that ok? CreationDate: *v.Properties.LastModified, }) + } else { + acl, err := getAclFromMetadata(v.Metadata, keyAclLower) + if err != nil { + return result, err + } + if acl.Owner == input.Owner { + buckets = append(buckets, s3response.ListAllMyBucketsEntry{ + Name: *v.Name, + // TODO: using modification date here instead of creation, is that ok? + CreationDate: *v.Properties.LastModified, + }) + } } } } - if resp.NextMarker != nil { - result.ContinuationToken = *resp.NextMarker - } - result.Buckets.Bucket = buckets result.Owner.ID = input.Owner + result.ContinuationToken = cToken return result, nil } @@ -575,7 +586,8 @@ func (az *Azure) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3 retention, ok := resp.Metadata[string(keyObjRetention)] if ok { var config types.ObjectLockRetention - if err := json.Unmarshal([]byte(*retention), &config); err == nil { + err := json.Unmarshal([]byte(*retention), &config) + if err == nil { result.ObjectLockMode = types.ObjectLockMode(config.Mode) result.ObjectLockRetainUntilDate = config.RetainUntilDate } @@ -1031,6 +1043,7 @@ func (az *Azure) CopyObject(ctx context.Context, input s3response.CopyObjectInpu if err != nil { return s3response.CopyObjectOutput{}, azureErrToS3Err(err) } + defer downloadResp.Body.Close() pInput := s3response.PutObjectInput{ Body: downloadResp.Body, @@ -1144,16 +1157,20 @@ func (az *Azure) CreateMultipartUpload(ctx context.Context, input s3response.Cre } if len(bucketLock) == 0 { - return s3response.InitiateMultipartUploadResult{}, s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces) + return s3response.InitiateMultipartUploadResult{}, + s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces) } var bucketLockConfig auth.BucketLockConfig - if err := json.Unmarshal(bucketLock, &bucketLockConfig); err != nil { - return s3response.InitiateMultipartUploadResult{}, fmt.Errorf("parse bucket lock config: %w", err) + err = json.Unmarshal(bucketLock, &bucketLockConfig) + if err != nil { + return s3response.InitiateMultipartUploadResult{}, + fmt.Errorf("parse bucket lock config: %w", err) } if !bucketLockConfig.Enabled { - return s3response.InitiateMultipartUploadResult{}, s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces) + return s3response.InitiateMultipartUploadResult{}, + s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces) } } @@ -1221,7 +1238,8 @@ func (az *Azure) CreateMultipartUpload(ctx context.Context, input s3response.Cre // Each part is translated into an uncommitted block in a newly created blob in staging area func (az *Azure) UploadPart(ctx context.Context, input *s3.UploadPartInput) (*s3.UploadPartOutput, error) { - if err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId); err != nil { + err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId) + if err != nil { return nil, err } @@ -1234,13 +1252,31 @@ func (az *Azure) UploadPart(ctx context.Context, input *s3.UploadPartInput) (*s3 return nil, err } + // block id serves as etag here + etag := blockIDInt32ToBase64(*input.PartNumber) + + // Azure StageBlock rejects Content-Length: 0 as an invalid header value. + // Track zero-byte parts in the sgwtmp metadata instead of staging them. + size, err := rdr.Seek(0, io.SeekEnd) + if err != nil { + return nil, err + } + if _, err = rdr.Seek(0, io.SeekStart); err != nil { + return nil, err + } + if size == 0 { + err := az.trackZeroBytePart(ctx, *input.Bucket, *input.Key, *input.UploadId, *input.PartNumber) + if err != nil { + return nil, err + } + return &s3.UploadPartOutput{ETag: &etag}, nil + } + client, err := az.getBlockBlobClient(*input.Bucket, *input.Key) if err != nil { return nil, err } - // block id serves as etag here - etag := blockIDInt32ToBase64(*input.PartNumber) _, err = client.StageBlock(ctx, etag, rdr, nil) if err != nil { return nil, parseMpError(err) @@ -1257,7 +1293,8 @@ func (az *Azure) UploadPartCopy(ctx context.Context, input *s3.UploadPartCopyInp return s3response.CopyPartResult{}, err } - if err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId); err != nil { + err = az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId) + if err != nil { return s3response.CopyPartResult{}, err } @@ -1274,7 +1311,8 @@ func (az *Azure) UploadPartCopy(ctx context.Context, input *s3.UploadPartCopyInp // Lists all uncommitted parts from the blob func (az *Azure) ListParts(ctx context.Context, input *s3.ListPartsInput) (s3response.ListPartsResult, error) { - if err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId); err != nil { + err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId) + if err != nil { return s3response.ListPartsResult{}, err } client, err := az.getBlockBlobClient(*input.Bucket, *input.Key) @@ -1290,49 +1328,66 @@ func (az *Azure) ListParts(ctx context.Context, input *s3.ListPartsInput) (s3res if *input.PartNumberMarker != "" { partNumberMarker, err = strconv.Atoi(*input.PartNumberMarker) if err != nil { - return s3response.ListPartsResult{}, s3err.GetInvalidMaxLimiterErr("part-number-marker") + return s3response.ListPartsResult{}, + s3err.GetInvalidMaxLimiterErr("part-number-marker") } } if input.MaxParts != nil { maxParts = *input.MaxParts } - resp, err := client.GetBlockList(ctx, blockblob.BlockListTypeUncommitted, nil) - if err != nil { - // If the mp exists but the client returns 'NoSuchKey' error, return empty result - if errors.Is(azureErrToS3Err(err), s3err.GetAPIError(s3err.ErrNoSuchKey)) { - return s3response.ListPartsResult{ - Bucket: *input.Bucket, - Key: *input.Key, - PartNumberMarker: partNumberMarker, - IsTruncated: isTruncated, - MaxParts: int(maxParts), - StorageClass: types.StorageClassStandard, - }, nil + resp, blockListErr := client.GetBlockList(ctx, blockblob.BlockListTypeUncommitted, nil) + if blockListErr != nil { + if !errors.Is(azureErrToS3Err(blockListErr), s3err.GetAPIError(s3err.ErrNoSuchKey)) { + return s3response.ListPartsResult{}, blockListErr } + // NoSuchKey means no blocks have been staged yet (possible if only zero-byte + // parts exist). Continue so we can still return those from metadata. } parts := []s3response.Part{} - for _, el := range resp.UncommittedBlocks { - partNumber, err := decodeBlockId(*el.Name) - if err != nil { - return s3response.ListPartsResult{}, err + if blockListErr == nil { + for _, el := range resp.UncommittedBlocks { + partNumber, err := decodeBlockId(*el.Name) + if err != nil { + return s3response.ListPartsResult{}, err + } + if partNumberMarker >= partNumber { + continue + } + parts = append(parts, s3response.Part{ + Size: *el.Size, + ETag: *el.Name, + PartNumber: partNumber, + LastModified: time.Now(), + }) } - if partNumberMarker >= partNumber { + } + + // Merge in zero-byte parts tracked in the sgwtmp metadata. + zbParts, _ := az.getZeroByteParts(ctx, *input.Bucket, *input.Key, *input.UploadId) + for _, zbPartNum := range zbParts { + if partNumberMarker >= int(zbPartNum) { continue } parts = append(parts, s3response.Part{ - Size: *el.Size, - ETag: *el.Name, - PartNumber: partNumber, + Size: 0, + ETag: blockIDInt32ToBase64(zbPartNum), + PartNumber: int(zbPartNum), LastModified: time.Now(), }) - if len(parts) >= int(maxParts) { - nextPartNumberMarker = partNumber - isTruncated = true - break - } } + + // Sort by part number and apply maxParts limit. + sort.Slice(parts, func(i, j int) bool { + return parts[i].PartNumber < parts[j].PartNumber + }) + if int32(len(parts)) > maxParts { + parts = parts[:maxParts] + nextPartNumberMarker = parts[len(parts)-1].PartNumber + isTruncated = true + } + return s3response.ListPartsResult{ Bucket: *input.Bucket, Key: *input.Key, @@ -1515,10 +1570,23 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete blockList, err := client.GetBlockList(ctx, blockblob.BlockListTypeUncommitted, nil) if err != nil { - return res, "", azureErrToS3Err(err) + if !errors.Is(azureErrToS3Err(err), s3err.GetAPIError(s3err.ErrNoSuchKey)) { + return res, "", azureErrToS3Err(err) + } + // NoSuchKey: no blocks staged; only zero-byte parts may exist. } - if len(blockList.UncommittedBlocks) != len(input.MultipartUpload.Parts) { + // Collect zero-byte parts tracked in the sgwtmp metadata. + zbParts, err := az.getZeroByteParts(ctx, *input.Bucket, *input.Key, *input.UploadId) + if err != nil { + return res, "", err + } + zbPartsMap := make(map[int32]bool, len(zbParts)) + for _, p := range zbParts { + zbPartsMap[p] = true + } + + if len(blockList.UncommittedBlocks)+len(zbParts) != len(input.MultipartUpload.Parts) { return res, "", s3err.GetAPIError(s3err.ErrInvalidPart) } @@ -1532,10 +1600,10 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete uncommittedBlocks[int32(ptNumber)] = el } - // The initialie values is the lower limit of partNumber: 0 + // The initial value is the lower limit of partNumber: 0 var totalSize int64 var partNumber int32 - last := len(blockList.UncommittedBlocks) - 1 + last := len(input.MultipartUpload.Parts) - 1 for i, part := range input.MultipartUpload.Parts { if part.PartNumber == nil { return res, "", s3err.GetAPIError(s3err.ErrInvalidPart) @@ -1550,6 +1618,19 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete block, ok := uncommittedBlocks[*part.PartNumber] if !ok { + // Check if this is a tracked zero-byte part. + if zbPartsMap[*part.PartNumber] { + expectedETag := blockIDInt32ToBase64(*part.PartNumber) + if getString(part.ETag) != expectedETag { + return res, "", s3err.GetAPIError(s3err.ErrInvalidPart) + } + // Non-last zero-byte parts violate the minimum part size. + if i < last { + return res, "", s3err.GetAPIError(s3err.ErrEntityTooSmall) + } + // Zero-byte parts contribute no data; skip adding to blockIds. + continue + } return res, "", s3err.GetAPIError(s3err.ErrInvalidPart) } @@ -1566,9 +1647,13 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete } if input.MpuObjectSize != nil && totalSize != *input.MpuObjectSize { - return res, "", s3err.GetIncorrectMpObjectSizeErr(totalSize, *input.MpuObjectSize) + return res, "", + s3err.GetIncorrectMpObjectSizeErr(totalSize, *input.MpuObjectSize) } + // Remove internal tracking keys from metadata before storing on the final blob. + delete(props.Metadata, string(keyMpZeroBytesParts)) + opts := &blockblob.CommitBlockListOptions{ Metadata: props.Metadata, Tags: parseAzTags(tags.BlobTagSet), @@ -1834,7 +1919,8 @@ func (az *Azure) isBucketObjectLockEnabled(ctx context.Context, bucket string) e } var bucketLockConfig auth.BucketLockConfig - if err := json.Unmarshal(cfg, &bucketLockConfig); err != nil { + err = json.Unmarshal(cfg, &bucketLockConfig) + if err != nil { return fmt.Errorf("parse bucket lock config: %w", err) } @@ -2119,6 +2205,77 @@ func createMetaTmpPath(obj, uploadId string) string { return filepath.Join(string(metaTmpMultipartPrefix), uploadId, fmt.Sprintf("%x", objNameSum)) } +// trackZeroBytePart records a zero-byte upload part in the sgwtmp metadata. +// Azure StageBlock rejects Content-Length: 0, so zero-byte parts are stored here. +func (az *Azure) trackZeroBytePart(ctx context.Context, bucket, key, uploadId string, partNumber int32) error { + tmpPath := createMetaTmpPath(key, uploadId) + blobClient, err := az.getBlobClient(bucket, tmpPath) + if err != nil { + return err + } + + props, err := blobClient.GetProperties(ctx, nil) + if err != nil { + return azureErrToS3Err(err) + } + + meta := props.Metadata + if meta == nil { + meta = map[string]*string{} + } + + // Deduplicate: replace an existing entry for the same partNumber. + parts := parseZeroByteParts(meta) + found := slices.Contains(parts, partNumber) + if !found { + parts = append(parts, partNumber) + } + + serialized := serializeZeroByteParts(parts) + meta[string(keyMpZeroBytesParts)] = &serialized + _, err = blobClient.SetMetadata(ctx, meta, nil) + return azureErrToS3Err(err) +} + +// getZeroByteParts returns the list of zero-byte parts tracked in the sgwtmp metadata. +func (az *Azure) getZeroByteParts(ctx context.Context, bucket, key, uploadId string) ([]int32, error) { + tmpPath := createMetaTmpPath(key, uploadId) + blobClient, err := az.getBlobClient(bucket, tmpPath) + if err != nil { + return nil, err + } + + props, err := blobClient.GetProperties(ctx, nil) + if err != nil { + return nil, azureErrToS3Err(err) + } + + return parseZeroByteParts(props.Metadata), nil +} + +func parseZeroByteParts(meta map[string]*string) []int32 { + val, ok := meta[string(keyMpZeroBytesParts)] + if !ok || val == nil || *val == "" { + return nil + } + var parts []int32 + for s := range strings.SplitSeq(*val, ",") { + n, err := strconv.ParseInt(strings.TrimSpace(s), 10, 32) + if err == nil { + parts = append(parts, int32(n)) + } + } + return parts +} + +func serializeZeroByteParts(parts []int32) string { + strs := make([]string, len(parts)) + for i, p := range parts { + strs[i] = strconv.Itoa(int(p)) + } + return strings.Join(strs, ",") +} + // Checks if the multipart upload existis with the given bucket, key and uploadId func (az *Azure) checkIfMpExists(ctx context.Context, bucket, obj, uploadId string) error { tmpPath := createMetaTmpPath(obj, uploadId)