Files
versitygw/backend/azure/azure.go
niksis02 f08f76fea4 feat: support x-amz-website-redirect-location
Integrate x-amz-website-redirect-location across object metadata flows so uploads, copies, multipart creation, HEAD, and GET preserve and return redirect locations, and website hosting applies object-level redirects from the stored value.
2026-06-10 12:41:55 +04:00

2625 lines
77 KiB
Go

// Copyright 2023 Versity Software
// This file is licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package azure
import (
"bytes"
"context"
"crypto/sha256"
"encoding/base64"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"net/url"
"os"
"path/filepath"
"slices"
"sort"
"strconv"
"strings"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/google/uuid"
"github.com/versity/versitygw/auth"
"github.com/versity/versitygw/backend"
"github.com/versity/versitygw/s3err"
"github.com/versity/versitygw/s3response"
)
// When getting container metadata with GetProperties method the sdk returns
// the first letter capital, when accessing the metadata after listing the containers
// it returns the first letter lower
type key string
const (
keyAclCapital key = "Acl"
keyAclLower key = "acl"
keyOwnership key = "Ownership"
keyTags key = "Tags"
keyPolicy key = "Policy"
keyCors key = "Cors"
keyWebsite key = "Website"
keyBucketLock key = "Bucketlock"
keyObjRetention key = "Objectretention"
keyObjLegalHold key = "Objectlegalhold"
keyExpires key = "Vgwexpires"
keyWebsiteRedirect key = "Vgwwebsiteredirect"
onameAttr key = "Objname"
onameAttrLower key = "objname"
metaTmpMultipartPrefix key = ".sgwtmp" + "/multipart"
// keyMpZeroBytesParts tracks zero-byte upload parts in the sgwtmp metadata.
// Azure StageBlock rejects Content-Length: 0, so zero-byte parts are stored here.
keyMpZeroBytesParts key = "Zerobytesparts"
// keyMpMetadata stores multipart upload part-offset metadata on the final
// committed blob so that GetObject/HeadObject can serve individual parts
// by part-number.
keyMpMetadata key = "Mpmetadata"
defaultListingMaxKeys = 1000
)
func (key) Table() map[string]struct{} {
return map[string]struct{}{
"acl": {},
"ownership": {},
"tags": {},
"policy": {},
"bucketlock": {},
"website": {},
"objectretention": {},
"vgwexpires": {},
"vgwwebsiteredirect": {},
"objectlegalhold": {},
"objname": {},
".sgwtmp/multipart": {},
"mpmetadata": {},
}
}
type Azure struct {
backend.BackendUnsupported
client *azblob.Client
sharedkeyCreds *azblob.SharedKeyCredential
defaultCreds *azidentity.DefaultAzureCredential
serviceURL string
sasToken string
copyObjectThreshold int64
}
var _ backend.Backend = &Azure{}
func New(accountName, accountKey, serviceURL, sasToken string, copyObjectThreshold int64) (*Azure, error) {
url := serviceURL
if serviceURL == "" && accountName != "" {
// if not otherwise specified, use the typical form:
// http(s)://<account>.blob.core.windows.net/
url = fmt.Sprintf("https://%s.blob.core.windows.net/", accountName)
}
if sasToken != "" {
client, err := azblob.NewClientWithNoCredential(url+"?"+sasToken, nil)
if err != nil {
return nil, fmt.Errorf("init client: %w", err)
}
return &Azure{
client: client,
serviceURL: serviceURL,
sasToken: sasToken,
copyObjectThreshold: copyObjectThreshold,
}, nil
}
if accountName == "" {
// if account name not provided, try to get from env var
accountName = os.Getenv("AZURE_CLIENT_ID")
}
if accountName == "" || accountKey == "" {
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
return nil, fmt.Errorf("init default credentials: %w", err)
}
client, err := azblob.NewClient(url, cred, nil)
if err != nil {
return nil, fmt.Errorf("init client: %w", err)
}
return &Azure{
client: client,
serviceURL: url,
defaultCreds: cred,
copyObjectThreshold: copyObjectThreshold,
}, nil
}
cred, err := azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
return nil, fmt.Errorf("init credentials: %w", err)
}
client, err := azblob.NewClientWithSharedKeyCredential(url, cred, nil)
if err != nil {
return nil, fmt.Errorf("init client: %w", err)
}
return &Azure{
client: client,
serviceURL: url,
sharedkeyCreds: cred,
copyObjectThreshold: copyObjectThreshold,
}, nil
}
func (az *Azure) Shutdown() {}
func (az *Azure) String() string {
return "Azure Blob Gateway"
}
func (az *Azure) CreateBucket(ctx context.Context, input *s3.CreateBucketInput, acl []byte) error {
meta := map[string]*string{
string(keyAclCapital): backend.GetPtrFromString(encodeBytes(acl)),
string(keyOwnership): backend.GetPtrFromString(encodeBytes([]byte(input.ObjectOwnership))),
}
acct, ok := ctx.Value("bucket-owner").(auth.Account)
if !ok {
acct = auth.Account{}
}
if input.ObjectLockEnabledForBucket != nil && *input.ObjectLockEnabledForBucket {
now := time.Now()
defaultLock := auth.BucketLockConfig{
Enabled: true,
CreatedAt: &now,
}
defaultLockParsed, err := json.Marshal(defaultLock)
if err != nil {
return fmt.Errorf("parse default bucket lock state: %w", err)
}
meta[string(keyBucketLock)] = backend.GetPtrFromString(encodeBytes(defaultLockParsed))
}
tagging, err := backend.ParseCreateBucketTags(input.CreateBucketConfiguration.Tags)
if err != nil {
return err
}
if tagging != nil {
tags, err := json.Marshal(tagging)
if err != nil {
return fmt.Errorf("marshal tags: %w", err)
}
meta[string(keyTags)] = backend.GetPtrFromString(encodeBytes(tags))
}
_, err = az.client.CreateContainer(ctx, *input.Bucket, &container.CreateOptions{Metadata: meta})
if errors.Is(s3err.GetAPIError(s3err.ErrBucketAlreadyExists), azureErrToS3Err(err)) {
aclBytes, err := az.getContainerMetaData(ctx, *input.Bucket, string(keyAclCapital))
if err != nil {
return err
}
acl, err := auth.ParseACL(aclBytes)
if err != nil {
return err
}
if acl.Owner == acct.Access {
return s3err.GetBucketErr(s3err.ErrBucketAlreadyOwnedByYou, *input.Bucket)
}
return s3err.GetBucketErr(s3err.ErrBucketAlreadyExists, *input.Bucket)
}
return azureErrToS3Err(err)
}
func (az *Azure) ListBuckets(ctx context.Context, input s3response.ListBucketsInput) (s3response.ListAllMyBucketsResult, error) {
opts := &service.ListContainersOptions{
Include: service.ListContainersInclude{
Metadata: true,
},
Prefix: &input.Prefix,
}
pager := az.client.NewListContainersPager(opts)
var buckets []s3response.ListAllMyBucketsEntry
var cToken string
result := s3response.ListAllMyBucketsResult{
Prefix: input.Prefix,
}
outer:
for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
return result, azureErrToS3Err(err)
}
for _, v := range resp.ContainerItems {
// If we've already filled MaxBuckets, there is a next page — set token and stop
if input.MaxBuckets > 0 && int32(len(buckets)) == input.MaxBuckets {
cToken = buckets[len(buckets)-1].Name
break outer
}
// Skip items at or before the continuation token (client-side "start after")
if input.ContinuationToken != "" && *v.Name <= input.ContinuationToken {
continue
}
if input.IsAdmin {
buckets = append(buckets, s3response.ListAllMyBucketsEntry{
Name: *v.Name,
// TODO: using modification date here instead of creation, is that ok?
CreationDate: *v.Properties.LastModified,
})
} else {
acl, err := getAclFromMetadata(v.Metadata, keyAclLower)
if err != nil {
return result, err
}
if acl.Owner == input.Owner {
buckets = append(buckets, s3response.ListAllMyBucketsEntry{
Name: *v.Name,
// TODO: using modification date here instead of creation, is that ok?
CreationDate: *v.Properties.LastModified,
})
}
}
}
}
result.Buckets.Bucket = buckets
result.Owner.ID = input.Owner
result.ContinuationToken = cToken
return result, nil
}
func (az *Azure) HeadBucket(ctx context.Context, input *s3.HeadBucketInput) (*s3.HeadBucketOutput, error) {
_, err := az.getContainerMetaData(ctx, *input.Bucket, "any")
if err != nil {
return nil, azureErrToS3Err(err)
}
return &s3.HeadBucketOutput{}, nil
}
func (az *Azure) DeleteBucket(ctx context.Context, bucket string) error {
pager := az.client.NewListBlobsFlatPager(bucket, nil)
for pager.More() {
pg, err := pager.NextPage(ctx)
if err != nil {
return azureErrToS3Err(err)
}
for _, item := range pg.Segment.BlobItems {
// ignore temp multipart objects when determining if bucket non-empty
if !strings.HasPrefix(backend.GetStringFromPtr(item.Name), string(metaTmpMultipartPrefix)) {
return s3err.GetAPIError(s3err.ErrBucketNotEmpty)
}
}
}
_, err := az.client.DeleteContainer(ctx, bucket, nil)
return azureErrToS3Err(err)
}
func (az *Azure) PutBucketOwnershipControls(ctx context.Context, bucket string, ownership types.ObjectOwnership) error {
return az.setContainerMetaData(ctx, bucket, string(keyOwnership), []byte(ownership))
}
func (az *Azure) GetBucketOwnershipControls(ctx context.Context, bucket string) (types.ObjectOwnership, error) {
var ownship types.ObjectOwnership
ownership, err := az.getContainerMetaData(ctx, bucket, string(keyOwnership))
if err != nil {
return ownship, err
}
if len(ownership) == 0 {
return ownship, s3err.GetBucketErr(s3err.ErrOwnershipControlsNotFound, bucket)
}
return types.ObjectOwnership(ownership), nil
}
func (az *Azure) DeleteBucketOwnershipControls(ctx context.Context, bucket string) error {
return az.deleteContainerMetaData(ctx, bucket, string(keyOwnership))
}
func (az *Azure) PutObject(ctx context.Context, po s3response.PutObjectInput) (s3response.PutObjectOutput, error) {
tags, err := backend.ParseObjectTags(getString(po.Tagging))
if err != nil {
return s3response.PutObjectOutput{}, err
}
err = az.evaluateWritePreconditions(ctx, po.Bucket, po.Key, po.IfMatch, po.IfNoneMatch)
if err != nil {
return s3response.PutObjectOutput{}, err
}
metadata := parseMetadata(po.Metadata)
// Store the "Expires" property in the object metadata
if getString(po.Expires) != "" {
if metadata == nil {
metadata = map[string]*string{
string(keyExpires): po.Expires,
}
} else {
metadata[string(keyExpires)] = po.Expires
}
}
if getString(po.WebsiteRedirectLocation) != "" {
if metadata == nil {
metadata = map[string]*string{
string(keyWebsiteRedirect): po.WebsiteRedirectLocation,
}
} else {
metadata[string(keyWebsiteRedirect)] = po.WebsiteRedirectLocation
}
}
opts := &blockblob.UploadStreamOptions{
Metadata: metadata,
Tags: tags,
}
opts.HTTPHeaders = &blob.HTTPHeaders{}
opts.HTTPHeaders.BlobContentEncoding = po.ContentEncoding
opts.HTTPHeaders.BlobContentLanguage = po.ContentLanguage
opts.HTTPHeaders.BlobContentDisposition = po.ContentDisposition
opts.HTTPHeaders.BlobContentLanguage = po.ContentLanguage
opts.HTTPHeaders.BlobCacheControl = po.CacheControl
if strings.HasSuffix(*po.Key, "/") {
// Hardcode "application/x-directory" for direcoty objects
opts.HTTPHeaders.BlobContentType = backend.GetPtrFromString(backend.DirContentType)
} else {
opts.HTTPHeaders.BlobContentType = po.ContentType
}
uploadResp, err := az.client.UploadStream(ctx, *po.Bucket, *po.Key, po.Body, opts)
if err != nil {
return s3response.PutObjectOutput{}, azureErrToS3Err(err)
}
// Set object legal hold
if po.ObjectLockLegalHoldStatus == types.ObjectLockLegalHoldStatusOn {
err := az.PutObjectLegalHold(ctx, *po.Bucket, *po.Key, "", true)
if err != nil {
if errors.Is(err, s3err.GetAPIError(s3err.ErrMissingObjectLockConfiguration)) {
err = s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces)
}
return s3response.PutObjectOutput{}, err
}
}
// Set object retention
if po.ObjectLockMode != "" {
retention := types.ObjectLockRetention{
Mode: types.ObjectLockRetentionMode(po.ObjectLockMode),
RetainUntilDate: po.ObjectLockRetainUntilDate,
}
retParsed, err := json.Marshal(retention)
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("parse object lock retention: %w", err)
}
err = az.PutObjectRetention(ctx, *po.Bucket, *po.Key, "", retParsed)
if err != nil {
if errors.Is(err, s3err.GetAPIError(s3err.ErrMissingObjectLockConfiguration)) {
err = s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces)
}
return s3response.PutObjectOutput{}, err
}
}
return s3response.PutObjectOutput{
ETag: convertAzureEtag(uploadResp.ETag),
Size: po.ContentLength,
}, nil
}
func (az *Azure) PutBucketTagging(ctx context.Context, bucket string, tags map[string]string) error {
if tags == nil {
return az.deleteContainerMetaData(ctx, bucket, string(keyTags))
}
tagsJson, err := json.Marshal(tags)
if err != nil {
return err
}
return az.setContainerMetaData(ctx, bucket, string(keyTags), tagsJson)
}
func (az *Azure) GetBucketTagging(ctx context.Context, bucket string) (map[string]string, error) {
tagsJson, err := az.getContainerMetaData(ctx, bucket, string(keyTags))
if err != nil {
return nil, err
}
if len(tagsJson) == 0 {
return nil, s3err.GetBucketErr(s3err.ErrBucketTaggingNotFound, bucket)
}
var tags map[string]string
err = json.Unmarshal(tagsJson, &tags)
if err != nil {
return nil, err
}
return tags, nil
}
func (az *Azure) DeleteBucketTagging(ctx context.Context, bucket string) error {
return az.PutBucketTagging(ctx, bucket, nil)
}
func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
client, err := az.getBlobClient(*input.Bucket, *input.Key)
if err != nil {
return nil, err
}
resp, err := client.GetProperties(ctx, nil)
if err != nil {
return nil, azureErrToS3Err(err)
}
if resp.ETag != nil && resp.LastModified != nil {
err = backend.EvaluatePreconditions(convertAzureEtag(resp.ETag), *resp.LastModified,
backend.PreConditions{
IfMatch: input.IfMatch,
IfNoneMatch: input.IfNoneMatch,
IfModSince: input.IfModifiedSince,
IfUnmodeSince: input.IfUnmodifiedSince,
})
if err != nil {
return nil, err
}
}
var objSize int64
if resp.ContentLength != nil {
objSize = *resp.ContentLength
}
var opts *azblob.DownloadStreamOptions
var partsCount *int32
var contentRange *string
if input.PartNumber != nil {
// Serve a specific part if the object has multipart upload metadata.
// For non-multipart objects (no mp-metadata), partNumber=1 returns the
// full object with no Content-Range; any other partNumber is out of range.
if mpMetaStr, ok := resp.Metadata[string(keyMpMetadata)]; ok && mpMetaStr != nil {
mpMeta, err := backend.UnmarshalMpUploadMetadata([]byte(*mpMetaStr), true)
if err != nil {
return nil, fmt.Errorf("parse object multipart metadata: %w", err)
}
partNum := *input.PartNumber
totalParts := int32(len(mpMeta.Parts))
partsCount = &totalParts
if partNum > totalParts {
return nil, s3err.GetInvalidPartNumberRangeErr(totalParts, partNum)
}
var startOffset int64
if partNum > 1 {
startOffset = mpMeta.Parts[partNum-2]
}
length := mpMeta.Parts[partNum-1] - startOffset
var objSize int64
if resp.ContentLength != nil {
objSize = *resp.ContentLength
}
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %d-%d/%d", startOffset, startOffset+length-1, objSize))
opts = &azblob.DownloadStreamOptions{
Range: blob.HTTPRange{
Offset: startOffset,
Count: length,
},
}
} else if *input.PartNumber > 1 {
return nil, s3err.GetInvalidPartNumberRangeErr(1, *input.PartNumber)
} else {
// partNumber=1 on a non-multipart object: fall through and serve the
// full object without a range (opts remains nil)
if objSize != 0 {
// if object size is 0, the whole object is served, no content range should be set
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes 0-%d/%d", objSize-1, objSize))
}
}
} else if *input.Range != "" {
offset, count, isValid, err := backend.ParseObjectRange(objSize, *input.Range)
if err != nil {
return nil, err
}
if isValid {
opts = &azblob.DownloadStreamOptions{
Range: blob.HTTPRange{
Count: count,
Offset: offset,
},
}
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %v-%v/%v", offset, offset+count-1, objSize))
}
}
blobDownloadResponse, err := az.client.DownloadStream(ctx, *input.Bucket, *input.Key, opts)
if err != nil {
return nil, azureErrToS3Err(err)
}
var tagcount int32
if blobDownloadResponse.TagCount != nil {
tagcount = int32(*blobDownloadResponse.TagCount)
}
return &s3.GetObjectOutput{
AcceptRanges: backend.GetPtrFromString("bytes"),
ContentLength: blobDownloadResponse.ContentLength,
ContentEncoding: blobDownloadResponse.ContentEncoding,
ContentType: blobDownloadResponse.ContentType,
ContentDisposition: blobDownloadResponse.ContentDisposition,
ContentLanguage: blobDownloadResponse.ContentLanguage,
CacheControl: blobDownloadResponse.CacheControl,
ExpiresString: blobDownloadResponse.Metadata[string(keyExpires)],
WebsiteRedirectLocation: blobDownloadResponse.Metadata[string(keyWebsiteRedirect)],
ETag: backend.GetPtrFromString(convertAzureEtag(blobDownloadResponse.ETag)),
LastModified: blobDownloadResponse.LastModified,
Metadata: parseAndFilterAzMetadata(blobDownloadResponse.Metadata),
TagCount: &tagcount,
ContentRange: contentRange,
Body: blobDownloadResponse.Body,
StorageClass: types.StorageClassStandard,
PartsCount: partsCount,
}, nil
}
func (az *Azure) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) {
client, err := az.getBlobClient(*input.Bucket, *input.Key)
if err != nil {
return nil, err
}
resp, err := client.GetProperties(ctx, nil)
if err != nil {
return nil, azureErrToS3Err(err)
}
if resp.ETag != nil && resp.LastModified != nil {
err = backend.EvaluatePreconditions(convertAzureEtag(resp.ETag), *resp.LastModified,
backend.PreConditions{
IfMatch: input.IfMatch,
IfNoneMatch: input.IfNoneMatch,
IfModSince: input.IfModifiedSince,
IfUnmodeSince: input.IfUnmodifiedSince,
})
if err != nil {
return nil, err
}
}
var size int64
if resp.ContentLength != nil {
size = *resp.ContentLength
}
var contentRange *string
var length int64
var partsCount *int32
if input.PartNumber != nil {
// Serve a specific part if the object has multipart upload metadata.
// For non-multipart objects (no mp-metadata), partNumber=1 returns the
// full object with no Content-Range; any other partNumber is out of range.
if mpMetaStr, ok := resp.Metadata[string(keyMpMetadata)]; ok && mpMetaStr != nil {
mpMeta, err := backend.UnmarshalMpUploadMetadata([]byte(*mpMetaStr), true)
if err != nil {
return nil, fmt.Errorf("parse object multipart metadata: %w", err)
}
partNum := *input.PartNumber
totalParts := int32(len(mpMeta.Parts))
partsCount = &totalParts
if partNum > totalParts {
return nil, s3err.GetInvalidPartNumberRangeErr(totalParts, partNum)
}
var startOffset int64
if partNum > 1 {
startOffset = mpMeta.Parts[partNum-2]
}
length = mpMeta.Parts[partNum-1] - startOffset
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %d-%d/%d", startOffset, startOffset+length-1, size))
} else if *input.PartNumber > 1 {
return nil, s3err.GetInvalidPartNumberRangeErr(1, *input.PartNumber)
} else {
// partNumber=1 on a non-multipart object: return full object size,
// no Content-Range, no PartsCount.
length = size
if length != 0 {
// if object size is 0, the whole object is served, no content range should be set
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes 0-%d/%d", size-1, size))
}
}
} else {
startOffset, lgth, isValid, err := backend.ParseObjectRange(size, getString(input.Range))
if err != nil {
return nil, err
}
length = lgth
if isValid {
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %v-%v/%v",
startOffset, startOffset+length-1, size))
}
}
result := &s3.HeadObjectOutput{
ContentRange: contentRange,
AcceptRanges: backend.GetPtrFromString("bytes"),
ContentLength: &length,
PartsCount: partsCount,
ContentType: resp.ContentType,
ContentEncoding: resp.ContentEncoding,
ContentLanguage: resp.ContentLanguage,
ContentDisposition: resp.ContentDisposition,
CacheControl: resp.CacheControl,
ExpiresString: resp.Metadata[string(keyExpires)],
WebsiteRedirectLocation: resp.Metadata[string(keyWebsiteRedirect)],
ETag: backend.GetPtrFromString(convertAzureEtag(resp.ETag)),
LastModified: resp.LastModified,
Metadata: parseAndFilterAzMetadata(resp.Metadata),
StorageClass: types.StorageClassStandard,
}
status, ok := resp.Metadata[string(keyObjLegalHold)]
if ok {
if *status == "1" {
result.ObjectLockLegalHoldStatus = types.ObjectLockLegalHoldStatusOn
} else {
result.ObjectLockLegalHoldStatus = types.ObjectLockLegalHoldStatusOff
}
}
retention, ok := resp.Metadata[string(keyObjRetention)]
if ok {
var config types.ObjectLockRetention
err := json.Unmarshal([]byte(*retention), &config)
if err == nil {
result.ObjectLockMode = types.ObjectLockMode(config.Mode)
result.ObjectLockRetainUntilDate = config.RetainUntilDate
}
}
if resp.TagCount != nil {
tagcount := int32(*resp.TagCount)
result.TagCount = &tagcount
}
return result, nil
}
func (az *Azure) GetObjectAttributes(ctx context.Context, input *s3.GetObjectAttributesInput) (s3response.GetObjectAttributesResponse, error) {
data, err := az.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: input.Bucket,
Key: input.Key,
})
if err != nil {
return s3response.GetObjectAttributesResponse{}, err
}
return s3response.GetObjectAttributesResponse{
ETag: data.ETag,
ObjectSize: data.ContentLength,
StorageClass: data.StorageClass,
LastModified: data.LastModified,
VersionId: data.VersionId,
DeleteMarker: data.DeleteMarker,
}, nil
}
func (az *Azure) ListObjects(ctx context.Context, input *s3.ListObjectsInput) (s3response.ListObjectsResult, error) {
// Retrieve the bucket acl to get the bucket owner
// All the objects in the bucket are owner by the bucket owner
aclBytes, err := az.getContainerMetaData(ctx, *input.Bucket, string(keyAclCapital))
if err != nil {
return s3response.ListObjectsResult{}, azureErrToS3Err(err)
}
acl, err := auth.ParseACL(aclBytes)
if err != nil {
return s3response.ListObjectsResult{}, err
}
client, err := az.getContainerClient(*input.Bucket)
if err != nil {
return s3response.ListObjectsResult{}, nil
}
var maxKeys int32 = defaultListingMaxKeys
if input.MaxKeys != nil {
maxKeys = *input.MaxKeys
}
delimiter := backend.GetStringFromPtr(input.Delimiter)
prefix := backend.GetStringFromPtr(input.Prefix)
effectiveMarker := backend.GetStringFromPtr(input.Marker)
if maxKeys == 0 {
isFalse := false
return s3response.ListObjectsResult{
IsTruncated: &isFalse,
MaxKeys: &maxKeys,
Name: input.Bucket,
Prefix: backend.GetPtrFromString(prefix),
Marker: backend.GetPtrFromString(effectiveMarker),
Delimiter: backend.GetPtrFromString(delimiter),
CommonPrefixes: []types.CommonPrefix{},
}, nil
}
// Use flat listing (empty delimiter) and handle delimiter logic client-side,
// matching S3 semantics. Only pass Prefix and Marker to Azure.
pager := client.NewListBlobsHierarchyPager("", &container.ListBlobsHierarchyOptions{
Prefix: input.Prefix,
Marker: input.Marker,
})
var objects []s3response.Object
var cPrefixes []types.CommonPrefix
cpSet := make(map[string]struct{})
var pastMax, isTruncated bool
var candidateMarker string
var totalFound int32
loop:
for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
return s3response.ListObjectsResult{}, azureErrToS3Err(err)
}
for _, v := range resp.Segment.BlobItems {
name := backend.GetStringFromPtr(v.Name)
// Filter out multipart upload blobs
if strings.HasPrefix(name, string(metaTmpMultipartPrefix)) {
continue
}
// Apply delimiter logic to determine if this blob contributes to
// a common prefix or is a regular object
isCP := false
cpKey := ""
if delimiter != "" {
suffix := strings.TrimPrefix(name, prefix)
before, _, found := strings.Cut(suffix, delimiter)
if found {
isCP = true
cpKey = prefix + before + delimiter
}
}
if isCP {
// Skip common prefixes at or before the marker
if cpKey <= effectiveMarker {
continue
}
// Deduplicate: multiple blobs can map to the same common prefix
if _, exists := cpSet[cpKey]; exists {
continue
}
// If we already reached maxKeys, this new unique CP means truncation
if pastMax {
isTruncated = true
break loop
}
cp := cpKey
cPrefixes = append(cPrefixes, types.CommonPrefix{Prefix: &cp})
cpSet[cpKey] = struct{}{}
candidateMarker = cpKey
totalFound++
if totalFound == maxKeys {
pastMax = true
}
} else {
if pastMax {
isTruncated = true
break loop
}
objects = append(objects, s3response.Object{
ETag: backend.GetPtrFromString(convertAzureEtag(v.Properties.ETag)),
Key: v.Name,
LastModified: v.Properties.LastModified,
Size: v.Properties.ContentLength,
StorageClass: types.ObjectStorageClassStandard,
Owner: &types.Owner{
ID: &acl.Owner,
},
})
candidateMarker = name
totalFound++
if totalFound == maxKeys {
pastMax = true
}
}
}
}
if !isTruncated {
candidateMarker = ""
}
return s3response.ListObjectsResult{
Contents: objects,
Marker: backend.GetPtrFromString(effectiveMarker),
MaxKeys: &maxKeys,
Name: input.Bucket,
NextMarker: backend.GetPtrFromString(candidateMarker),
Prefix: backend.GetPtrFromString(prefix),
IsTruncated: &isTruncated,
Delimiter: backend.GetPtrFromString(delimiter),
CommonPrefixes: cPrefixes,
}, nil
}
func (az *Azure) ListObjectsV2(ctx context.Context, input *s3.ListObjectsV2Input) (s3response.ListObjectsV2Result, error) {
// Retrieve the bucket acl to get the bucket owner
// All the objects in the bucket are owner by the bucket owner
aclBytes, err := az.getContainerMetaData(ctx, *input.Bucket, string(keyAclCapital))
if err != nil {
return s3response.ListObjectsV2Result{}, azureErrToS3Err(err)
}
acl, err := auth.ParseACL(aclBytes)
if err != nil {
return s3response.ListObjectsV2Result{}, err
}
client, err := az.getContainerClient(*input.Bucket)
if err != nil {
return s3response.ListObjectsV2Result{}, nil
}
var maxKeys int32 = defaultListingMaxKeys
if input.MaxKeys != nil {
maxKeys = *input.MaxKeys
}
delimiter := backend.GetStringFromPtr(input.Delimiter)
prefix := backend.GetStringFromPtr(input.Prefix)
startAfterVal := backend.GetStringFromPtr(input.StartAfter)
continuationTokenVal := backend.GetStringFromPtr(input.ContinuationToken)
// Take the lexicographically larger of startAfter and continuationToken so
// listing starts strictly after both constraints.
effectiveMarker := startAfterVal
if continuationTokenVal > effectiveMarker {
effectiveMarker = continuationTokenVal
}
if maxKeys == 0 {
isFalse := false
return s3response.ListObjectsV2Result{
IsTruncated: &isFalse,
MaxKeys: &maxKeys,
Name: input.Bucket,
Prefix: backend.GetPtrFromString(prefix),
ContinuationToken: backend.GetPtrFromString(continuationTokenVal),
Delimiter: backend.GetPtrFromString(delimiter),
StartAfter: backend.GetPtrFromString(startAfterVal),
CommonPrefixes: []types.CommonPrefix{},
}, nil
}
// Use flat listing (empty delimiter) and handle delimiter logic client-side,
// matching S3 semantics. Only pass Prefix and Marker to Azure.
// effectiveMarker is passed as Marker so Azure skips blobs before it.
pager := client.NewListBlobsHierarchyPager("", &container.ListBlobsHierarchyOptions{
Prefix: input.Prefix,
Marker: backend.GetPtrFromString(effectiveMarker),
})
var objects []s3response.Object
var cPrefixes []types.CommonPrefix
cpSet := make(map[string]struct{})
var pastMax, isTruncated bool
var candidateMarker string
var totalFound int32
loop:
for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
return s3response.ListObjectsV2Result{}, azureErrToS3Err(err)
}
for _, v := range resp.Segment.BlobItems {
name := backend.GetStringFromPtr(v.Name)
// Filter out multipart upload blobs
if strings.HasPrefix(name, string(metaTmpMultipartPrefix)) {
continue
}
// Apply delimiter logic to determine if this blob contributes to
// a common prefix or is a regular object
isCP := false
cpKey := ""
if delimiter != "" {
suffix := strings.TrimPrefix(name, prefix)
before, _, found := strings.Cut(suffix, delimiter)
if found {
isCP = true
cpKey = prefix + before + delimiter
}
}
if isCP {
// Skip common prefixes at or before the effective marker
if cpKey <= effectiveMarker {
continue
}
// Deduplicate: multiple blobs can map to the same common prefix
if _, exists := cpSet[cpKey]; exists {
continue
}
// If we already reached maxKeys, this new unique CP means truncation
if pastMax {
isTruncated = true
break loop
}
cp := cpKey
cPrefixes = append(cPrefixes, types.CommonPrefix{Prefix: &cp})
cpSet[cpKey] = struct{}{}
candidateMarker = cpKey
totalFound++
if totalFound == maxKeys {
pastMax = true
}
} else {
if pastMax {
isTruncated = true
break loop
}
objects = append(objects, s3response.Object{
ETag: backend.GetPtrFromString(convertAzureEtag(v.Properties.ETag)),
Key: v.Name,
LastModified: v.Properties.LastModified,
Size: v.Properties.ContentLength,
StorageClass: types.ObjectStorageClassStandard,
Owner: &types.Owner{
ID: &acl.Owner,
},
})
candidateMarker = name
totalFound++
if totalFound == maxKeys {
pastMax = true
}
}
}
}
if !isTruncated {
candidateMarker = ""
}
keyCount := int32(len(objects) + len(cPrefixes))
return s3response.ListObjectsV2Result{
Contents: objects,
ContinuationToken: backend.GetPtrFromString(continuationTokenVal),
KeyCount: &keyCount,
MaxKeys: &maxKeys,
Name: input.Bucket,
NextContinuationToken: backend.GetPtrFromString(candidateMarker),
Prefix: backend.GetPtrFromString(prefix),
IsTruncated: &isTruncated,
Delimiter: backend.GetPtrFromString(delimiter),
CommonPrefixes: cPrefixes,
StartAfter: backend.GetPtrFromString(startAfterVal),
}, nil
}
func (az *Azure) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) {
if input.IfMatch != nil || input.IfMatchLastModifiedTime != nil || input.IfMatchSize != nil {
// evaluate the preconditions before deleting the object
props, err := az.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: input.Bucket,
Key: input.Key,
})
if err != nil && !errors.Is(err, s3err.GetAPIError(s3err.ErrNoSuchKey)) {
// if object doesn't exist, skip preconditions
// if unexpected error shows up, return the error
return nil, err
}
if err == nil {
var etag string
if props.ETag != nil {
etag = *props.ETag
}
var lastMod time.Time
if props.LastModified != nil {
lastMod = *props.LastModified
}
var size int64
if props.ContentLength != nil {
size = *props.ContentLength
}
err := backend.EvaluateObjectDeletePreconditions(etag, lastMod, size,
backend.ObjectDeletePreconditions{
IfMatch: input.IfMatch,
IfMatchLastModTime: input.IfMatchLastModifiedTime,
IfMatchSize: input.IfMatchSize,
})
if err != nil {
return nil, err
}
}
}
_, err := az.client.DeleteBlob(ctx, *input.Bucket, *input.Key, nil)
if err != nil {
azerr, ok := err.(*azcore.ResponseError)
if ok && azerr.StatusCode == 404 {
// if the object does not exist, S3 returns success
return &s3.DeleteObjectOutput{}, nil
}
}
return &s3.DeleteObjectOutput{}, azureErrToS3Err(err)
}
func (az *Azure) DeleteObjects(ctx context.Context, input *s3.DeleteObjectsInput) (s3response.DeleteResult, error) {
delResult, errs := []types.DeletedObject{}, []types.Error{}
for _, obj := range input.Delete.Objects {
_, err := az.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: input.Bucket,
Key: obj.Key,
})
if err == nil {
delResult = append(delResult, types.DeletedObject{Key: obj.Key})
} else {
serr, ok := err.(s3err.S3Error)
if ok {
code := serr.BaseError().Code
message := serr.BaseError().Description
errs = append(errs, types.Error{
Key: obj.Key,
Code: &code,
Message: &message,
})
} else {
errs = append(errs, types.Error{
Key: obj.Key,
Code: backend.GetPtrFromString("InternalError"),
Message: backend.GetPtrFromString(err.Error()),
})
}
}
}
return s3response.DeleteResult{
Deleted: delResult,
Error: errs,
}, nil
}
func (az *Azure) CopyObject(ctx context.Context, input s3response.CopyObjectInput) (s3response.CopyObjectOutput, error) {
dstClient, err := az.getBlobClient(*input.Bucket, *input.Key)
if err != nil {
return s3response.CopyObjectOutput{}, err
}
srcBucket, srcObj, _, err := backend.ParseCopySource(*input.CopySource)
if err != nil {
return s3response.CopyObjectOutput{}, err
}
if input.ExpectedSourceBucketOwner != nil && *input.ExpectedSourceBucketOwner != "" {
aclData, err := az.GetBucketAcl(ctx, &s3.GetBucketAclInput{Bucket: &srcBucket})
if err != nil {
return s3response.CopyObjectOutput{}, err
}
srcAcl, err := auth.ParseACL(aclData)
if err != nil {
return s3response.CopyObjectOutput{}, err
}
if srcAcl.Owner != *input.ExpectedSourceBucketOwner {
return s3response.CopyObjectOutput{}, s3err.GetAPIError(s3err.ErrAccessDenied)
}
}
if !areNils(input.CopySourceIfMatch, input.CopySourceIfNoneMatch) || !areNils(input.CopySourceIfModifiedSince, input.CopySourceIfUnmodifiedSince) {
_, err = az.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: &srcBucket,
Key: &srcObj,
IfMatch: input.CopySourceIfMatch,
IfNoneMatch: input.CopySourceIfNoneMatch,
IfModifiedSince: input.CopySourceIfModifiedSince,
IfUnmodifiedSince: input.CopySourceIfUnmodifiedSince,
})
if err != nil {
return s3response.CopyObjectOutput{}, err
}
}
if srcBucket == *input.Bucket && srcObj == *input.Key {
if input.MetadataDirective != types.MetadataDirectiveReplace {
return s3response.CopyObjectOutput{}, s3err.GetAPIError(s3err.ErrInvalidCopyDest)
}
resp, err := dstClient.GetProperties(ctx, nil)
if err != nil {
return s3response.CopyObjectOutput{}, azureErrToS3Err(err)
}
if resp.ContentLength != nil && *resp.ContentLength > az.copyObjectThreshold {
return s3response.CopyObjectOutput{}, s3err.GetCopySourceObjectTooLargeErr(az.copyObjectThreshold)
}
// Set object meta http headers
res, err := dstClient.SetHTTPHeaders(ctx, blob.HTTPHeaders{
BlobCacheControl: input.CacheControl,
BlobContentDisposition: input.ContentDisposition,
BlobContentEncoding: input.ContentEncoding,
BlobContentLanguage: input.ContentLanguage,
BlobContentType: input.ContentType,
}, nil)
if err != nil {
return s3response.CopyObjectOutput{}, azureErrToS3Err(err)
}
meta := input.Metadata
if meta == nil {
meta = make(map[string]string)
}
// Embed "Expires" in object metadata
if getString(input.Expires) != "" {
meta[string(keyExpires)] = *input.Expires
}
if getString(input.WebsiteRedirectLocation) != "" {
meta[string(keyWebsiteRedirect)] = *input.WebsiteRedirectLocation
}
// Set object metadata
_, err = dstClient.SetMetadata(ctx, parseMetadata(meta), nil)
if err != nil {
return s3response.CopyObjectOutput{}, azureErrToS3Err(err)
}
// Set object legal hold
if input.ObjectLockLegalHoldStatus != "" {
err = az.PutObjectLegalHold(ctx, *input.Bucket, *input.Key, "", input.ObjectLockLegalHoldStatus == types.ObjectLockLegalHoldStatusOn)
if err != nil {
if errors.Is(err, s3err.GetAPIError(s3err.ErrMissingObjectLockConfiguration)) {
err = s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces)
}
return s3response.CopyObjectOutput{}, azureErrToS3Err(err)
}
}
// Set object retention
if input.ObjectLockMode != "" && input.ObjectLockRetainUntilDate != nil {
retention := s3response.PutObjectRetentionInput{
Mode: types.ObjectLockRetentionMode(input.ObjectLockMode),
RetainUntilDate: s3response.AmzDate{
Time: *input.ObjectLockRetainUntilDate,
},
}
retParsed, err := json.Marshal(retention)
if err != nil {
return s3response.CopyObjectOutput{}, fmt.Errorf("parse object retention: %w", err)
}
err = az.PutObjectRetention(ctx, *input.Bucket, *input.Key, "", retParsed)
if err != nil {
if errors.Is(err, s3err.GetAPIError(s3err.ErrMissingObjectLockConfiguration)) {
err = s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces)
}
return s3response.CopyObjectOutput{}, azureErrToS3Err(err)
}
}
// Set object Tagging, if tagging directive is "REPLACE"
if input.TaggingDirective == types.TaggingDirectiveReplace {
tags, err := backend.ParseObjectTags(getString(input.Tagging))
if err != nil {
return s3response.CopyObjectOutput{}, err
}
_, err = dstClient.SetTags(ctx, tags, nil)
if err != nil {
return s3response.CopyObjectOutput{}, azureErrToS3Err(err)
}
}
return s3response.CopyObjectOutput{
CopyObjectResult: &s3response.CopyObjectResult{
LastModified: res.LastModified,
ETag: backend.GetPtrFromString(convertAzureEtag(res.ETag)),
},
}, nil
}
// Get the source object
downloadResp, err := az.client.DownloadStream(ctx, srcBucket, srcObj, nil)
if err != nil {
return s3response.CopyObjectOutput{}, azureErrToS3Err(err)
}
defer downloadResp.Body.Close()
if downloadResp.ContentLength != nil && *downloadResp.ContentLength > az.copyObjectThreshold {
return s3response.CopyObjectOutput{}, s3err.GetCopySourceObjectTooLargeErr(az.copyObjectThreshold)
}
pInput := s3response.PutObjectInput{
Body: downloadResp.Body,
Bucket: input.Bucket,
Key: input.Key,
ContentLength: downloadResp.ContentLength,
ContentType: input.ContentType,
ContentEncoding: input.ContentEncoding,
ContentDisposition: input.ContentDisposition,
ContentLanguage: input.ContentLanguage,
CacheControl: input.CacheControl,
Expires: input.Expires,
WebsiteRedirectLocation: input.WebsiteRedirectLocation,
Metadata: input.Metadata,
ObjectLockRetainUntilDate: input.ObjectLockRetainUntilDate,
ObjectLockMode: input.ObjectLockMode,
ObjectLockLegalHoldStatus: input.ObjectLockLegalHoldStatus,
}
if input.MetadataDirective == types.MetadataDirectiveCopy {
// Expires is in downloadResp.Metadata
pInput.Expires = nil
pInput.CacheControl = downloadResp.CacheControl
pInput.ContentDisposition = downloadResp.ContentDisposition
pInput.ContentEncoding = downloadResp.ContentEncoding
pInput.ContentLanguage = downloadResp.ContentLanguage
pInput.ContentType = downloadResp.ContentType
pInput.Metadata = parseAzMetadata(downloadResp.Metadata)
delete(pInput.Metadata, string(keyWebsiteRedirect))
}
if input.TaggingDirective == types.TaggingDirectiveReplace {
pInput.Tagging = input.Tagging
}
// Create the destination object
resp, err := az.PutObject(ctx, pInput)
if err != nil {
return s3response.CopyObjectOutput{}, err
}
// Copy the object tagging, if tagging directive is "COPY"
if input.TaggingDirective == types.TaggingDirectiveCopy {
srcClient, err := az.getBlobClient(srcBucket, srcObj)
if err != nil {
return s3response.CopyObjectOutput{}, err
}
res, err := srcClient.GetTags(ctx, nil)
if err != nil {
return s3response.CopyObjectOutput{}, azureErrToS3Err(err)
}
_, err = dstClient.SetTags(ctx, parseAzTags(res.BlobTagSet), nil)
if err != nil {
return s3response.CopyObjectOutput{}, azureErrToS3Err(err)
}
}
return s3response.CopyObjectOutput{
CopyObjectResult: &s3response.CopyObjectResult{
ETag: &resp.ETag,
},
}, nil
}
func (az *Azure) PutObjectTagging(ctx context.Context, bucket, object, _ string, tags map[string]string) error {
client, err := az.getBlobClient(bucket, object)
if err != nil {
return err
}
_, err = client.SetTags(ctx, tags, nil)
if err != nil {
return azureErrToS3Err(err)
}
return nil
}
func (az *Azure) GetObjectTagging(ctx context.Context, bucket, object, _ string) (map[string]string, error) {
client, err := az.getBlobClient(bucket, object)
if err != nil {
return nil, err
}
tags, err := client.GetTags(ctx, nil)
if err != nil {
return nil, azureErrToS3Err(err)
}
return parseAzTags(tags.BlobTagSet), nil
}
func (az *Azure) DeleteObjectTagging(ctx context.Context, bucket, object, _ string) error {
client, err := az.getBlobClient(bucket, object)
if err != nil {
return err
}
_, err = client.SetTags(ctx, map[string]string{}, nil)
if err != nil {
return azureErrToS3Err(err)
}
return nil
}
func (az *Azure) CreateMultipartUpload(ctx context.Context, input s3response.CreateMultipartUploadInput) (s3response.InitiateMultipartUploadResult, error) {
if input.ObjectLockLegalHoldStatus != "" || input.ObjectLockMode != "" {
bucketLock, err := az.getContainerMetaData(ctx, *input.Bucket, string(keyBucketLock))
if err != nil {
return s3response.InitiateMultipartUploadResult{}, azureErrToS3Err(err)
}
if len(bucketLock) == 0 {
return s3response.InitiateMultipartUploadResult{},
s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces)
}
var bucketLockConfig auth.BucketLockConfig
err = json.Unmarshal(bucketLock, &bucketLockConfig)
if err != nil {
return s3response.InitiateMultipartUploadResult{},
fmt.Errorf("parse bucket lock config: %w", err)
}
if !bucketLockConfig.Enabled {
return s3response.InitiateMultipartUploadResult{},
s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces)
}
}
meta := parseMetadata(input.Metadata)
meta[string(onameAttr)] = input.Key
if getString(input.Expires) != "" {
meta[string(keyExpires)] = input.Expires
}
if getString(input.WebsiteRedirectLocation) != "" {
meta[string(keyWebsiteRedirect)] = input.WebsiteRedirectLocation
}
// parse object tags
tags, err := backend.ParseObjectTags(getString(input.Tagging))
if err != nil {
return s3response.InitiateMultipartUploadResult{}, err
}
// set blob legal hold status in metadata
if input.ObjectLockLegalHoldStatus == types.ObjectLockLegalHoldStatusOn {
meta[string(keyObjLegalHold)] = backend.GetPtrFromString("1")
}
// set blob retention date
if input.ObjectLockMode != "" {
retention := types.ObjectLockRetention{
Mode: types.ObjectLockRetentionMode(input.ObjectLockMode),
RetainUntilDate: input.ObjectLockRetainUntilDate,
}
retParsed, err := json.Marshal(retention)
if err != nil {
return s3response.InitiateMultipartUploadResult{}, azureErrToS3Err(err)
}
meta[string(keyObjRetention)] = backend.GetPtrFromString(string(retParsed))
}
uploadId := uuid.New().String()
tmpPath := createMetaTmpPath(*input.Key, uploadId)
opts := &blockblob.UploadBufferOptions{
Metadata: meta,
Tags: tags,
HTTPHeaders: &blob.HTTPHeaders{
BlobContentType: input.ContentType,
BlobContentEncoding: input.ContentEncoding,
BlobCacheControl: input.CacheControl,
BlobContentDisposition: input.ContentDisposition,
BlobContentLanguage: input.ContentLanguage,
},
}
// Create and empty blob in .sgwtmp/multipart/<uploadId>/<object hash>
// The blob indicates multipart upload initialization and holds the mp metadata
// e.g tagging, content-type, metadata, object lock status ...
_, err = az.client.UploadBuffer(ctx, *input.Bucket, tmpPath, []byte{}, opts)
if err != nil {
return s3response.InitiateMultipartUploadResult{}, azureErrToS3Err(err)
}
return s3response.InitiateMultipartUploadResult{
Bucket: *input.Bucket,
Key: *input.Key,
UploadId: uploadId,
}, nil
}
// Each part is translated into an uncommitted block in a newly created blob in staging area
func (az *Azure) UploadPart(ctx context.Context, input *s3.UploadPartInput) (*s3.UploadPartOutput, error) {
err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId)
if err != nil {
return nil, err
}
// TODO: request streamable version of StageBlock()
// (*blockblob.Client).StageBlock does not have a streamable
// version of this function at this time, so we need to cache
// the body in memory to create an io.ReadSeekCloser
rdr, err := getReadSeekCloser(input.Body)
if err != nil {
return nil, err
}
// block id serves as etag here
etag := blockIDInt32ToBase64(*input.PartNumber)
// Azure StageBlock rejects Content-Length: 0 as an invalid header value.
// Track zero-byte parts in the sgwtmp metadata instead of staging them.
size, err := rdr.Seek(0, io.SeekEnd)
if err != nil {
return nil, err
}
if _, err = rdr.Seek(0, io.SeekStart); err != nil {
return nil, err
}
if size == 0 {
err := az.trackZeroBytePart(ctx, *input.Bucket, *input.Key, *input.UploadId, *input.PartNumber)
if err != nil {
return nil, err
}
return &s3.UploadPartOutput{ETag: &etag}, nil
}
client, err := az.getBlockBlobClient(*input.Bucket, *input.Key)
if err != nil {
return nil, err
}
_, err = client.StageBlock(ctx, etag, rdr, nil)
if err != nil {
return nil, parseMpError(err)
}
return &s3.UploadPartOutput{
ETag: &etag,
}, nil
}
func (az *Azure) UploadPartCopy(ctx context.Context, input *s3.UploadPartCopyInput) (s3response.CopyPartResult, error) {
client, err := az.getBlockBlobClient(*input.Bucket, *input.Key)
if err != nil {
return s3response.CopyPartResult{}, err
}
err = az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId)
if err != nil {
return s3response.CopyPartResult{}, err
}
eTag := blockIDInt32ToBase64(*input.PartNumber)
//TODO: handle block copy by range
//TODO: the action returns not implemented on azurite, maybe in production this will work?
_, err = client.StageBlockFromURL(ctx, eTag, *input.CopySource, nil)
if err != nil {
return s3response.CopyPartResult{}, parseMpError(err)
}
return s3response.CopyPartResult{}, nil
}
// Lists all uncommitted parts from the blob
func (az *Azure) ListParts(ctx context.Context, input *s3.ListPartsInput) (s3response.ListPartsResult, error) {
err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId)
if err != nil {
return s3response.ListPartsResult{}, err
}
client, err := az.getBlockBlobClient(*input.Bucket, *input.Key)
if err != nil {
return s3response.ListPartsResult{}, nil
}
var partNumberMarker int
var nextPartNumberMarker int
var maxParts int32 = math.MaxInt32
var isTruncated bool
if *input.PartNumberMarker != "" {
partNumberMarker, err = strconv.Atoi(*input.PartNumberMarker)
if err != nil {
return s3response.ListPartsResult{},
s3err.GetInvalidArgMaxLimiter("part-number-marker", *input.PartNumberMarker)
}
}
if input.MaxParts != nil {
maxParts = *input.MaxParts
}
resp, blockListErr := client.GetBlockList(ctx, blockblob.BlockListTypeUncommitted, nil)
if blockListErr != nil {
if !errors.Is(azureErrToS3Err(blockListErr), s3err.GetAPIError(s3err.ErrNoSuchKey)) {
return s3response.ListPartsResult{}, blockListErr
}
// NoSuchKey means no blocks have been staged yet (possible if only zero-byte
// parts exist). Continue so we can still return those from metadata.
}
parts := []s3response.Part{}
if blockListErr == nil {
for _, el := range resp.UncommittedBlocks {
partNumber, err := decodeBlockId(*el.Name)
if err != nil {
return s3response.ListPartsResult{}, err
}
if partNumberMarker >= partNumber {
continue
}
parts = append(parts, s3response.Part{
Size: *el.Size,
ETag: *el.Name,
PartNumber: partNumber,
LastModified: time.Now(),
})
}
}
// Merge in zero-byte parts tracked in the sgwtmp metadata.
zbParts, _ := az.getZeroByteParts(ctx, *input.Bucket, *input.Key, *input.UploadId)
for _, zbPartNum := range zbParts {
if partNumberMarker >= int(zbPartNum) {
continue
}
parts = append(parts, s3response.Part{
Size: 0,
ETag: blockIDInt32ToBase64(zbPartNum),
PartNumber: int(zbPartNum),
LastModified: time.Now(),
})
}
// Sort by part number and apply maxParts limit.
sort.Slice(parts, func(i, j int) bool {
return parts[i].PartNumber < parts[j].PartNumber
})
if int32(len(parts)) > maxParts {
parts = parts[:maxParts]
nextPartNumberMarker = parts[len(parts)-1].PartNumber
isTruncated = true
}
return s3response.ListPartsResult{
Bucket: *input.Bucket,
Key: *input.Key,
Parts: parts,
NextPartNumberMarker: nextPartNumberMarker,
PartNumberMarker: partNumberMarker,
IsTruncated: isTruncated,
MaxParts: int(maxParts),
StorageClass: types.StorageClassStandard,
}, nil
}
// Lists all the multipart uploads initiated with .sgwtmp/multipart prefix
func (az *Azure) ListMultipartUploads(ctx context.Context, input *s3.ListMultipartUploadsInput) (s3response.ListMultipartUploadsResult, error) {
var bucket string
if input.Bucket != nil {
bucket = *input.Bucket
}
client, err := az.getContainerClient(bucket)
if err != nil {
return s3response.ListMultipartUploadsResult{}, err
}
var delimiter string
if input.Delimiter != nil {
delimiter = *input.Delimiter
}
var prefix string
if input.Prefix != nil {
prefix = *input.Prefix
}
var keyMarker string
if input.KeyMarker != nil {
keyMarker = *input.KeyMarker
}
var uploadIDMarker string
if input.UploadIdMarker != nil {
uploadIDMarker = *input.UploadIdMarker
}
maxUploads := int(*input.MaxUploads)
mpPrefix := string(metaTmpMultipartPrefix)
pager := client.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
Include: container.ListBlobsInclude{Metadata: true},
Prefix: &mpPrefix,
})
uploads := []s3response.Upload{}
for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
return s3response.ListMultipartUploadsResult{}, azureErrToS3Err(err)
}
for _, el := range resp.Segment.BlobItems {
key, ok := el.Metadata[string(onameAttrLower)]
if !ok {
continue
}
if keyMarker != "" && *key <= keyMarker {
continue
}
if prefix != "" && !strings.HasPrefix(*key, prefix) {
continue
}
path := filepath.Clean(*el.Name)
parts := strings.Split(path, "/")
uploadId := parts[2]
uploads = append(uploads, s3response.Upload{
Key: *key,
Initiated: *el.Properties.CreationTime,
UploadID: uploadId,
StorageClass: types.StorageClassStandard,
})
}
}
// Sort once: Key asc, Initiated asc
sort.SliceStable(uploads, func(i, j int) bool {
if uploads[i].Key != uploads[j].Key {
return uploads[i].Key < uploads[j].Key
}
return uploads[i].Initiated.Before(uploads[j].Initiated)
})
result, err := backend.ListMultipartUploads(uploads, prefix, delimiter, keyMarker, uploadIDMarker, maxUploads)
if err != nil {
return s3response.ListMultipartUploadsResult{}, err
}
return s3response.ListMultipartUploadsResult{
Bucket: bucket,
Delimiter: delimiter,
KeyMarker: keyMarker,
MaxUploads: maxUploads,
Prefix: prefix,
NextKeyMarker: result.NextKeyMarker,
NextUploadIDMarker: result.NextUploadIDMarker,
UploadIDMarker: uploadIDMarker,
IsTruncated: result.IsTruncated,
Uploads: result.Uploads,
CommonPrefixes: result.CommonPrefixes,
}, nil
}
// Deletes the block blob with committed/uncommitted blocks
// Cleans up the initiated multipart upload in .sgwtmp namespace
func (az *Azure) AbortMultipartUpload(ctx context.Context, input *s3.AbortMultipartUploadInput) error {
tmpPath := createMetaTmpPath(*input.Key, *input.UploadId)
if input.IfMatchInitiatedTime != nil {
client, err := az.getBlobClient(*input.Bucket, tmpPath)
if err != nil {
return err
}
resp, err := client.GetProperties(ctx, nil)
if err != nil {
return azureErrToS3Err(err)
}
if resp.LastModified != nil && resp.LastModified.Unix() != input.IfMatchInitiatedTime.Unix() {
return s3err.GetPreconditionFailedErr(s3err.ConditionIfMatchInitiatedTime)
}
}
_, err := az.client.DeleteBlob(ctx, *input.Bucket, tmpPath, nil)
if err != nil {
return parseMpError(err)
}
// Cleanup the uploaded parts
_, err = az.client.DeleteBlob(ctx, *input.Bucket, *input.Key, nil)
if err != nil {
err = azureErrToS3Err(err)
if errors.Is(err, s3err.GetAPIError(s3err.ErrNoSuchKey)) {
return nil
}
return err
}
return nil
}
// Commits all the uncommitted blocks inside the block blob
// And moves the block blob from staging area into the blobs list.
// Copeies the multipart metadata from .sgwtmp namespace into the newly created blob
// Deletes the multipart upload 'blob' from .sgwtmp namespace
// It indicates the end of the multipart upload
func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
var res s3response.CompleteMultipartUploadResult
err := az.evaluateWritePreconditions(ctx, input.Bucket, input.Key, input.IfMatch, input.IfNoneMatch)
if err != nil {
return s3response.CompleteMultipartUploadResult{}, "", err
}
tmpPath := createMetaTmpPath(*input.Key, *input.UploadId)
blobClient, err := az.getBlobClient(*input.Bucket, tmpPath)
if err != nil {
return res, "", err
}
props, err := blobClient.GetProperties(ctx, nil)
if err != nil {
mpErr := parseMpError(err)
// If the tmp blob is already gone, the upload may have already been
// completed. Check the final object's mp-metadata and return success
// if the upload IDs match.
if errors.Is(mpErr, s3err.GetAPIError(s3err.ErrNoSuchUpload)) {
finalClient, clientErr := az.getBlobClient(*input.Bucket, *input.Key)
if clientErr == nil {
finalProps, propErr := finalClient.GetProperties(ctx, nil)
if propErr == nil {
if mpMetaStr, ok := finalProps.Metadata[string(keyMpMetadata)]; ok && mpMetaStr != nil {
mpMeta, metaErr := backend.UnmarshalMpUploadMetadata([]byte(*mpMetaStr), true)
if metaErr == nil && mpMeta.UploadID == *input.UploadId {
return s3response.CompleteMultipartUploadResult{
Bucket: input.Bucket,
Key: input.Key,
ETag: backend.GetPtrFromString(convertAzureEtag(finalProps.ETag)),
}, "", nil
}
}
}
}
}
return res, "", mpErr
}
tags, err := blobClient.GetTags(ctx, nil)
if err != nil {
return res, "", parseMpError(err)
}
client, err := az.getBlockBlobClient(*input.Bucket, *input.Key)
if err != nil {
return res, "", err
}
blockIds := []string{}
blockList, err := client.GetBlockList(ctx, blockblob.BlockListTypeUncommitted, nil)
if err != nil {
if !errors.Is(azureErrToS3Err(err), s3err.GetAPIError(s3err.ErrNoSuchKey)) {
return res, "", azureErrToS3Err(err)
}
// NoSuchKey: no blocks staged; only zero-byte parts may exist.
}
// Collect zero-byte parts tracked in the sgwtmp metadata.
zbParts, err := az.getZeroByteParts(ctx, *input.Bucket, *input.Key, *input.UploadId)
if err != nil {
return res, "", err
}
zbPartsMap := make(map[int32]bool, len(zbParts))
for _, p := range zbParts {
zbPartsMap[p] = true
}
if len(blockList.UncommittedBlocks)+len(zbParts) != len(input.MultipartUpload.Parts) {
return res, "", s3err.GetInvalidPartErr(*input.UploadId, 0, "")
}
uncommittedBlocks := map[int32]*blockblob.Block{}
for _, el := range blockList.UncommittedBlocks {
ptNumber, err := decodeBlockId(backend.GetStringFromPtr(el.Name))
if err != nil {
return res, "", fmt.Errorf("invalid block name: %w", err)
}
uncommittedBlocks[int32(ptNumber)] = el
}
// The initial value is the lower limit of partNumber: 0
var totalSize int64
var partNumber int32
// partSizes[i] = cumulative byte offset after part i+1 (see backend.MpUploadMetadata)
var partSizes []int64
last := len(input.MultipartUpload.Parts) - 1
for i, part := range input.MultipartUpload.Parts {
if part.PartNumber == nil {
return res, "", s3err.GetAPIError(s3err.ErrMalformedXML)
}
if part.ETag == nil {
return res, "", s3err.GetAPIError(s3err.ErrMalformedXML)
}
if *part.PartNumber < 1 {
return res, "", s3err.GetInvalidArgumentErr(s3err.InvalidArgCompleteMpPartNumber, fmt.Sprint(*part.PartNumber))
}
if *part.PartNumber <= partNumber {
return res, "", s3err.GetAPIError(s3err.ErrInvalidPartOrder)
}
partNumber = *part.PartNumber
block, ok := uncommittedBlocks[*part.PartNumber]
if !ok {
// Check if this is a tracked zero-byte part.
if zbPartsMap[*part.PartNumber] {
expectedETag := blockIDInt32ToBase64(*part.PartNumber)
if getString(part.ETag) != expectedETag {
return res, "", s3err.GetInvalidPartErr(*input.UploadId, *part.PartNumber, expectedETag)
}
// Non-last zero-byte parts violate the minimum part size.
if i < last {
return res, "", s3err.GetEntityTooSmallErr(0, backend.MinPartSize)
}
// Zero-byte parts contribute no data; skip adding to blockIds.
partSizes = append(partSizes, totalSize)
continue
}
return res, "", s3err.GetInvalidPartErr(*input.UploadId, *part.PartNumber, "")
}
if *part.ETag != *block.Name {
return res, "", s3err.GetInvalidPartErr(*input.UploadId, *part.PartNumber, getString(part.ETag))
}
// all parts except the last need to be greater, than
// the minimum allowed size (5 Mib)
if i < last && *block.Size < backend.MinPartSize {
return res, "", s3err.GetEntityTooSmallErr(*block.Size, backend.MinPartSize)
}
totalSize += *block.Size
partSizes = append(partSizes, totalSize)
blockIds = append(blockIds, *block.Name)
}
if input.MpuObjectSize != nil && totalSize != *input.MpuObjectSize {
return res, "",
s3err.GetIncorrectMpObjectSizeErr(totalSize, *input.MpuObjectSize)
}
// Remove internal tracking keys from metadata before storing on the final blob.
delete(props.Metadata, string(keyMpZeroBytesParts))
// Serialize multipart metadata so GetObject/HeadObject can serve by part-number.
mpMeta := backend.MpUploadMetadata{UploadID: *input.UploadId, Parts: partSizes}
mpMetaBytes, err := backend.MarshalMpUploadMetadata(mpMeta, true)
if err != nil {
return res, "", fmt.Errorf("marshal mp metadata: %w", err)
}
mpMetaStr := string(mpMetaBytes)
if props.Metadata == nil {
props.Metadata = map[string]*string{}
}
props.Metadata[string(keyMpMetadata)] = &mpMetaStr
opts := &blockblob.CommitBlockListOptions{
Metadata: props.Metadata,
Tags: parseAzTags(tags.BlobTagSet),
}
opts.HTTPHeaders = &blob.HTTPHeaders{
BlobContentType: props.ContentType,
BlobContentEncoding: props.ContentEncoding,
BlobContentDisposition: props.ContentDisposition,
BlobContentLanguage: props.ContentLanguage,
BlobCacheControl: props.CacheControl,
}
resp, err := client.CommitBlockList(ctx, blockIds, opts)
if err != nil {
return res, "", parseMpError(err)
}
// cleanup the multipart upload
_, err = blobClient.Delete(ctx, nil)
if err != nil {
return res, "", parseMpError(err)
}
return s3response.CompleteMultipartUploadResult{
Bucket: input.Bucket,
Key: input.Key,
ETag: backend.GetPtrFromString(convertAzureEtag(resp.ETag)),
}, "", nil
}
func (az *Azure) PutBucketAcl(ctx context.Context, bucket string, data []byte) error {
return az.setContainerMetaData(ctx, bucket, string(keyAclCapital), data)
}
func (az *Azure) GetBucketAcl(ctx context.Context, input *s3.GetBucketAclInput) ([]byte, error) {
return az.getContainerMetaData(ctx, *input.Bucket, string(keyAclCapital))
}
func (az *Azure) PutBucketPolicy(ctx context.Context, bucket string, policy []byte) error {
if policy == nil {
return az.deleteContainerMetaData(ctx, bucket, string(keyPolicy))
}
return az.setContainerMetaData(ctx, bucket, string(keyPolicy), policy)
}
func (az *Azure) GetBucketPolicy(ctx context.Context, bucket string) ([]byte, error) {
p, err := az.getContainerMetaData(ctx, bucket, string(keyPolicy))
if err != nil {
return nil, err
}
if len(p) == 0 {
return nil, s3err.GetBucketErr(s3err.ErrNoSuchBucketPolicy, bucket)
}
return p, nil
}
func (az *Azure) DeleteBucketPolicy(ctx context.Context, bucket string) error {
return az.PutBucketPolicy(ctx, bucket, nil)
}
func (az *Azure) PutBucketCors(ctx context.Context, bucket string, cors []byte) error {
if cors == nil {
return az.deleteContainerMetaData(ctx, bucket, string(keyCors))
}
return az.setContainerMetaData(ctx, bucket, string(keyCors), cors)
}
func (az *Azure) GetBucketCors(ctx context.Context, bucket string) ([]byte, error) {
p, err := az.getContainerMetaData(ctx, bucket, string(keyCors))
if err != nil {
return nil, err
}
if len(p) == 0 {
return nil, s3err.GetBucketErr(s3err.ErrNoSuchCORSConfiguration, bucket)
}
return p, nil
}
func (az *Azure) DeleteBucketCors(ctx context.Context, bucket string) error {
return az.PutBucketCors(ctx, bucket, nil)
}
func (az *Azure) PutBucketWebsite(ctx context.Context, bucket string, website []byte) error {
if website == nil {
return az.deleteContainerMetaData(ctx, bucket, string(keyWebsite))
}
encoded, err := backend.MarshalWebsiteConfig(website, true)
if err != nil {
return err
}
return az.setContainerMetaData(ctx, bucket, string(keyWebsite), encoded)
}
func (az *Azure) GetBucketWebsite(ctx context.Context, bucket string) ([]byte, error) {
website, err := az.getContainerMetaData(ctx, bucket, string(keyWebsite))
if err != nil {
return nil, err
}
if len(website) == 0 {
return nil, s3err.GetBucketErr(s3err.ErrNoSuchWebsiteConfiguration, bucket)
}
decoded, err := backend.UnmarshalWebsiteConfig(website, true)
if err != nil {
return nil, err
}
return decoded, nil
}
func (az *Azure) DeleteBucketWebsite(ctx context.Context, bucket string) error {
return az.PutBucketWebsite(ctx, bucket, nil)
}
func (az *Azure) PutObjectLockConfiguration(ctx context.Context, bucket string, config []byte) error {
return az.setContainerMetaData(ctx, bucket, string(keyBucketLock), config)
}
func (az *Azure) GetObjectLockConfiguration(ctx context.Context, bucket string) ([]byte, error) {
cfg, err := az.getContainerMetaData(ctx, bucket, string(keyBucketLock))
if err != nil {
return nil, err
}
if len(cfg) == 0 {
return nil, s3err.GetBucketErr(s3err.ErrObjectLockConfigurationNotFound, bucket)
}
return cfg, nil
}
func (az *Azure) PutObjectRetention(ctx context.Context, bucket, object, versionId string, retention []byte) error {
err := az.isBucketObjectLockEnabled(ctx, bucket)
if err != nil {
return err
}
blobClient, err := az.getBlobClient(bucket, object)
if err != nil {
return err
}
blobProps, err := blobClient.GetProperties(ctx, nil)
if err != nil {
return azureErrToS3Err(err)
}
meta := blobProps.Metadata
if meta == nil {
meta = map[string]*string{
string(keyObjRetention): backend.GetPtrFromString(string(retention)),
}
} else {
meta[string(keyObjRetention)] = backend.GetPtrFromString(string(retention))
}
_, err = blobClient.SetMetadata(ctx, meta, nil)
if err != nil {
return azureErrToS3Err(err)
}
return nil
}
func (az *Azure) GetObjectRetention(ctx context.Context, bucket, object, versionId string) ([]byte, error) {
client, err := az.getBlobClient(bucket, object)
if err != nil {
return nil, err
}
props, err := client.GetProperties(ctx, nil)
if err != nil {
return nil, azureErrToS3Err(err)
}
err = az.isBucketObjectLockEnabled(ctx, bucket)
if err != nil {
return nil, err
}
retentionPtr, ok := props.Metadata[string(keyObjRetention)]
if !ok {
return nil, s3err.GetAPIError(s3err.ErrNoSuchObjectLockConfiguration)
}
return []byte(*retentionPtr), nil
}
func (az *Azure) PutObjectLegalHold(ctx context.Context, bucket, object, versionId string, status bool) error {
err := az.isBucketObjectLockEnabled(ctx, bucket)
if err != nil {
return err
}
blobClient, err := az.getBlobClient(bucket, object)
if err != nil {
return err
}
blobProps, err := blobClient.GetProperties(ctx, nil)
if err != nil {
return azureErrToS3Err(err)
}
var statusData string
if status {
statusData = "1"
} else {
statusData = "0"
}
meta := blobProps.Metadata
if meta == nil {
meta = map[string]*string{
string(keyObjLegalHold): &statusData,
}
} else {
meta[string(keyObjLegalHold)] = &statusData
}
_, err = blobClient.SetMetadata(ctx, meta, nil)
if err != nil {
return azureErrToS3Err(err)
}
return nil
}
func (az *Azure) GetObjectLegalHold(ctx context.Context, bucket, object, versionId string) (*bool, error) {
client, err := az.getBlobClient(bucket, object)
if err != nil {
return nil, err
}
props, err := client.GetProperties(ctx, nil)
if err != nil {
return nil, azureErrToS3Err(err)
}
err = az.isBucketObjectLockEnabled(ctx, bucket)
if err != nil {
return nil, err
}
retentionPtr, ok := props.Metadata[string(keyObjLegalHold)]
if !ok {
return nil, s3err.GetAPIError(s3err.ErrNoSuchObjectLockConfiguration)
}
status := *retentionPtr == "1"
return &status, nil
}
func (az *Azure) ChangeBucketOwner(ctx context.Context, bucket, owner string) error {
return auth.UpdateBucketACLOwner(ctx, az, bucket, owner)
}
// The action actually returns the containers owned by the user, who initialized the gateway
// TODO: Not sure if there's a way to list all the containers and owners?
func (az *Azure) ListBucketsAndOwners(ctx context.Context) (buckets []s3response.Bucket, err error) {
opts := &service.ListContainersOptions{
Include: service.ListContainersInclude{Metadata: true},
}
pager := az.client.NewListContainersPager(opts)
for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
return buckets, azureErrToS3Err(err)
}
for _, v := range resp.ContainerItems {
acl, err := getAclFromMetadata(v.Metadata, keyAclLower)
if err != nil {
return buckets, err
}
buckets = append(buckets, s3response.Bucket{
Name: *v.Name,
Owner: acl.Owner,
})
}
}
return buckets, nil
}
func (az *Azure) isBucketObjectLockEnabled(ctx context.Context, bucket string) error {
cfg, err := az.getContainerMetaData(ctx, bucket, string(keyBucketLock))
if err != nil {
return azureErrToS3Err(err)
}
if len(cfg) == 0 {
return s3err.GetAPIError(s3err.ErrMissingObjectLockConfiguration)
}
var bucketLockConfig auth.BucketLockConfig
err = json.Unmarshal(cfg, &bucketLockConfig)
if err != nil {
return fmt.Errorf("parse bucket lock config: %w", err)
}
if !bucketLockConfig.Enabled {
return s3err.GetAPIError(s3err.ErrMissingObjectLockConfiguration)
}
return nil
}
func (az *Azure) getContainerURL(cntr string) string {
return fmt.Sprintf("%v/%v", strings.TrimRight(az.serviceURL, "/"), cntr)
}
func (az *Azure) getBlobURL(cntr, blb string) string {
// URL-encode the blob name to handle special characters like {, }, #, spaces, etc.
// Use PathEscape to encode the blob name while preserving forward slashes
encodedBlob := url.PathEscape(blb)
return fmt.Sprintf("%v/%v", az.getContainerURL(cntr), encodedBlob)
}
func (az *Azure) getBlobClient(cntr, blb string) (*blob.Client, error) {
blobURL := az.getBlobURL(cntr, blb)
if az.defaultCreds != nil {
return blob.NewClient(blobURL, az.defaultCreds, nil)
}
if az.sasToken != "" {
return blob.NewClientWithNoCredential(blobURL+"?"+az.sasToken, nil)
}
return blob.NewClientWithSharedKeyCredential(blobURL, az.sharedkeyCreds, nil)
}
func (az *Azure) getContainerClient(cntr string) (*container.Client, error) {
containerURL := az.getContainerURL(cntr)
if az.defaultCreds != nil {
return container.NewClient(containerURL, az.defaultCreds, nil)
}
if az.sasToken != "" {
return container.NewClientWithNoCredential(containerURL+"?"+az.sasToken, nil)
}
return container.NewClientWithSharedKeyCredential(containerURL, az.sharedkeyCreds, nil)
}
func (az *Azure) getBlockBlobClient(cntr, blb string) (*blockblob.Client, error) {
blobURL := az.getBlobURL(cntr, blb)
if az.defaultCreds != nil {
return blockblob.NewClient(blobURL, az.defaultCreds, nil)
}
if az.sasToken != "" {
return blockblob.NewClientWithNoCredential(blobURL+"?"+az.sasToken, nil)
}
return blockblob.NewClientWithSharedKeyCredential(blobURL, az.sharedkeyCreds, nil)
}
func parseMetadata(m map[string]string) map[string]*string {
if m == nil {
return nil
}
meta := make(map[string]*string)
for k, v := range m {
val := v
meta[k] = &val
}
return meta
}
func parseAndFilterAzMetadata(m map[string]*string) map[string]string {
if m == nil {
return nil
}
keywords := keyTags.Table()
meta := make(map[string]string)
for k, v := range m {
_, ok := keywords[strings.ToLower(k)]
if ok {
continue
}
meta[k] = *v
}
return meta
}
func parseAzMetadata(m map[string]*string) map[string]string {
if m == nil {
return nil
}
meta := make(map[string]string)
for k, v := range m {
meta[k] = *v
}
return meta
}
func parseAzTags(tagSet []*blob.Tags) map[string]string {
tags := map[string]string{}
for _, tag := range tagSet {
tags[*tag.Key] = *tag.Value
}
return tags
}
func getString(str *string) string {
if str == nil {
return ""
}
return *str
}
// Converts io.Reader into io.ReadSeekCloser
func getReadSeekCloser(input io.Reader) (io.ReadSeekCloser, error) {
var buffer bytes.Buffer
_, err := io.Copy(&buffer, input)
if err != nil {
return nil, err
}
return streaming.NopCloser(bytes.NewReader(buffer.Bytes())), nil
}
// Creates a new Base64 encoded block id from a 32 bit integer
func blockIDInt32ToBase64(blockID int32) string {
binaryBlockID := &[4]byte{} // All block IDs are 4 bytes long
binary.LittleEndian.PutUint32(binaryBlockID[:], uint32(blockID))
return base64.StdEncoding.EncodeToString(binaryBlockID[:])
}
// Decodes Base64 encoded string to integer
func decodeBlockId(blockID string) (int, error) {
slice, err := base64.StdEncoding.DecodeString(blockID)
if err != nil {
return 0, nil
}
return int(binary.LittleEndian.Uint32(slice)), nil
}
func encodeBytes(b []byte) string {
return base64.StdEncoding.EncodeToString(b)
}
func decodeString(str string) ([]byte, error) {
if str == "" {
return []byte{}, nil
}
decoded, err := base64.StdEncoding.DecodeString(str)
if err != nil {
return nil, err
}
return decoded, nil
}
func (az *Azure) getContainerMetaData(ctx context.Context, bucket, key string) ([]byte, error) {
client, err := az.getContainerClient(bucket)
if err != nil {
return nil, err
}
props, err := client.GetProperties(ctx, nil)
if err != nil {
return nil, azureErrToS3Err(err)
}
if props.Metadata == nil {
return []byte{}, nil
}
data, ok := props.Metadata[key]
if !ok {
return []byte{}, nil
}
value, err := decodeString(*data)
if err != nil {
return nil, err
}
return value, nil
}
func (az *Azure) setContainerMetaData(ctx context.Context, bucket, key string, value []byte) error {
client, err := az.getContainerClient(bucket)
if err != nil {
return err
}
props, err := client.GetProperties(ctx, nil)
if err != nil {
return azureErrToS3Err(err)
}
mdmap := props.Metadata
if mdmap == nil {
mdmap = make(map[string]*string)
}
str := encodeBytes(value)
mdmap[key] = backend.GetPtrFromString(str)
_, err = client.SetMetadata(ctx, &container.SetMetadataOptions{Metadata: mdmap})
if err != nil {
return err
}
return nil
}
func (az *Azure) deleteContainerMetaData(ctx context.Context, bucket, key string) error {
client, err := az.getContainerClient(bucket)
if err != nil {
return err
}
props, err := client.GetProperties(ctx, nil)
if err != nil {
return azureErrToS3Err(err)
}
mdmap := props.Metadata
if mdmap == nil {
mdmap = make(map[string]*string)
}
delete(mdmap, key)
_, err = client.SetMetadata(ctx, &container.SetMetadataOptions{Metadata: mdmap})
if err != nil {
return err
}
return nil
}
func (az *Azure) evaluateWritePreconditions(ctx context.Context, bucket, object, ifMatch, ifNoneMatch *string) error {
if areNils(ifMatch, ifNoneMatch) {
return nil
}
// call HeadObject to evaluate preconditions
res, err := az.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: bucket,
Key: object,
})
if err != nil && !errors.Is(err, s3err.GetAPIError(s3err.ErrNoSuchKey)) {
return err
}
var etag string
if res != nil {
etag = backend.GetStringFromPtr(res.ETag)
}
return backend.EvaluateObjectPutPreconditions(etag, ifMatch, ifNoneMatch, !errors.Is(err, s3err.GetAPIError(s3err.ErrNoSuchKey)))
}
func getAclFromMetadata(meta map[string]*string, key key) (*auth.ACL, error) {
data, ok := meta[string(key)]
if !ok {
return &auth.ACL{}, nil
}
value, err := decodeString(*data)
if err != nil {
return nil, err
}
acl, err := auth.ParseACL(value)
if err != nil {
return nil, err
}
return &acl, nil
}
func createMetaTmpPath(obj, uploadId string) string {
objNameSum := sha256.Sum256([]byte(obj))
return filepath.Join(string(metaTmpMultipartPrefix), uploadId, fmt.Sprintf("%x", objNameSum))
}
// trackZeroBytePart records a zero-byte upload part in the sgwtmp metadata.
// Azure StageBlock rejects Content-Length: 0, so zero-byte parts are stored here.
func (az *Azure) trackZeroBytePart(ctx context.Context, bucket, key, uploadId string, partNumber int32) error {
tmpPath := createMetaTmpPath(key, uploadId)
blobClient, err := az.getBlobClient(bucket, tmpPath)
if err != nil {
return err
}
props, err := blobClient.GetProperties(ctx, nil)
if err != nil {
return azureErrToS3Err(err)
}
meta := props.Metadata
if meta == nil {
meta = map[string]*string{}
}
// Deduplicate: replace an existing entry for the same partNumber.
parts := parseZeroByteParts(meta)
found := slices.Contains(parts, partNumber)
if !found {
parts = append(parts, partNumber)
}
serialized := serializeZeroByteParts(parts)
meta[string(keyMpZeroBytesParts)] = &serialized
_, err = blobClient.SetMetadata(ctx, meta, nil)
return azureErrToS3Err(err)
}
// getZeroByteParts returns the list of zero-byte parts tracked in the sgwtmp metadata.
func (az *Azure) getZeroByteParts(ctx context.Context, bucket, key, uploadId string) ([]int32, error) {
tmpPath := createMetaTmpPath(key, uploadId)
blobClient, err := az.getBlobClient(bucket, tmpPath)
if err != nil {
return nil, err
}
props, err := blobClient.GetProperties(ctx, nil)
if err != nil {
return nil, azureErrToS3Err(err)
}
return parseZeroByteParts(props.Metadata), nil
}
func parseZeroByteParts(meta map[string]*string) []int32 {
val, ok := meta[string(keyMpZeroBytesParts)]
if !ok || val == nil || *val == "" {
return nil
}
var parts []int32
for s := range strings.SplitSeq(*val, ",") {
n, err := strconv.ParseInt(strings.TrimSpace(s), 10, 32)
if err == nil {
parts = append(parts, int32(n))
}
}
return parts
}
func serializeZeroByteParts(parts []int32) string {
strs := make([]string, len(parts))
for i, p := range parts {
strs[i] = strconv.Itoa(int(p))
}
return strings.Join(strs, ",")
}
// Checks if the multipart upload existis with the given bucket, key and uploadId
func (az *Azure) checkIfMpExists(ctx context.Context, bucket, obj, uploadId string) error {
tmpPath := createMetaTmpPath(obj, uploadId)
blobClient, err := az.getBlobClient(bucket, tmpPath)
if err != nil {
return err
}
_, err = blobClient.GetProperties(ctx, nil)
if err != nil {
return s3err.GetNoSuchUploadErr(uploadId)
}
return nil
}
func convertAzureEtag(etag *azcore.ETag) string {
// Azure ETag values are not S3 compatible,
// so append "-1" to avoid client SDK ETag validation issues.
str := (*string)(etag)
return *backend.TrimEtag(str) + "-1"
}
func areNils[T any](args ...*T) bool {
for _, arg := range args {
if arg != nil {
return false
}
}
return true
}