mirror of
https://github.com/versity/versitygw.git
synced 2026-04-19 12:15:02 +00:00
Merge pull request #792 from versity/fix/azure-integration-tests-coverage
Azure integration test issues
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -45,6 +45,7 @@ tests/.secrets*
|
||||
|
||||
# IAM users files often created in testing
|
||||
users.json
|
||||
users.json.backup
|
||||
|
||||
# env files for testing
|
||||
**/.env*
|
||||
|
||||
@@ -17,6 +17,7 @@ package azure
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
@@ -25,7 +26,9 @@ import (
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -40,6 +43,7 @@ import (
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||
"github.com/google/uuid"
|
||||
"github.com/versity/versitygw/auth"
|
||||
"github.com/versity/versitygw/backend"
|
||||
"github.com/versity/versitygw/s3err"
|
||||
@@ -52,18 +56,35 @@ import (
|
||||
type key string
|
||||
|
||||
const (
|
||||
keyAclCapital key = "Acl"
|
||||
keyAclLower key = "acl"
|
||||
keyOwnership key = "Ownership"
|
||||
keyTags key = "Tags"
|
||||
keyPolicy key = "Policy"
|
||||
keyBucketLock key = "Bucketlock"
|
||||
keyObjRetention key = "Objectretention"
|
||||
keyObjLegalHold key = "Objectlegalhold"
|
||||
keyAclCapital key = "Acl"
|
||||
keyAclLower key = "acl"
|
||||
keyOwnership key = "Ownership"
|
||||
keyTags key = "Tags"
|
||||
keyPolicy key = "Policy"
|
||||
keyBucketLock key = "Bucketlock"
|
||||
keyObjRetention key = "Objectretention"
|
||||
keyObjLegalHold key = "Objectlegalhold"
|
||||
onameAttr key = "Objname"
|
||||
onameAttrLower key = "objname"
|
||||
metaTmpMultipartPrefix key = ".sgwtmp" + "/multipart"
|
||||
|
||||
defaultContentType = "binary/octet-stream"
|
||||
)
|
||||
|
||||
func (key) Table() map[string]struct{} {
|
||||
return map[string]struct{}{
|
||||
"acl": {},
|
||||
"ownership": {},
|
||||
"tags": {},
|
||||
"policy": {},
|
||||
"bucketlock": {},
|
||||
"objectretention": {},
|
||||
"objectlegalhold": {},
|
||||
"objname": {},
|
||||
".sgwtmp/multipart": {},
|
||||
}
|
||||
}
|
||||
|
||||
type Azure struct {
|
||||
backend.BackendUnsupported
|
||||
|
||||
@@ -254,6 +275,9 @@ func (az *Azure) GetBucketOwnershipControls(ctx context.Context, bucket string)
|
||||
if err != nil {
|
||||
return ownship, err
|
||||
}
|
||||
if len(ownership) == 0 {
|
||||
return ownship, s3err.GetAPIError(s3err.ErrOwnershipControlsNotFound)
|
||||
}
|
||||
|
||||
return types.ObjectOwnership(ownership), nil
|
||||
}
|
||||
@@ -334,11 +358,11 @@ func (az *Azure) GetBucketTagging(ctx context.Context, bucket string) (map[strin
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var tags map[string]string
|
||||
if len(tagsJson) == 0 {
|
||||
return tags, nil
|
||||
return nil, s3err.GetAPIError(s3err.ErrBucketTaggingNotFound)
|
||||
}
|
||||
|
||||
var tags map[string]string
|
||||
err = json.Unmarshal(tagsJson, &tags)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -391,6 +415,7 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.G
|
||||
TagCount: &tagcount,
|
||||
ContentRange: blobDownloadResponse.ContentRange,
|
||||
Body: blobDownloadResponse.Body,
|
||||
StorageClass: types.StorageClassStandard,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -419,6 +444,7 @@ func (az *Azure) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3
|
||||
ContentLength: block.Size,
|
||||
ETag: block.Name,
|
||||
PartsCount: &partsCount,
|
||||
StorageClass: types.StorageClassStandard,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
@@ -447,6 +473,7 @@ func (az *Azure) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3
|
||||
LastModified: resp.LastModified,
|
||||
Metadata: parseAzMetadata(resp.Metadata),
|
||||
Expires: resp.ExpiresOn,
|
||||
StorageClass: types.StorageClassStandard,
|
||||
}
|
||||
|
||||
status, ok := resp.Metadata[string(keyObjLegalHold)]
|
||||
@@ -475,64 +502,31 @@ func (az *Azure) GetObjectAttributes(ctx context.Context, input *s3.GetObjectAtt
|
||||
Bucket: input.Bucket,
|
||||
Key: input.Key,
|
||||
})
|
||||
if err == nil {
|
||||
return s3response.GetObjectAttributesResult{
|
||||
ETag: data.ETag,
|
||||
LastModified: data.LastModified,
|
||||
ObjectSize: data.ContentLength,
|
||||
StorageClass: data.StorageClass,
|
||||
VersionId: data.VersionId,
|
||||
}, nil
|
||||
}
|
||||
if !errors.Is(err, s3err.GetAPIError(s3err.ErrNoSuchKey)) {
|
||||
return s3response.GetObjectAttributesResult{}, err
|
||||
}
|
||||
|
||||
resp, err := az.ListParts(ctx, &s3.ListPartsInput{
|
||||
Bucket: input.Bucket,
|
||||
Key: input.Key,
|
||||
PartNumberMarker: input.PartNumberMarker,
|
||||
MaxParts: input.MaxParts,
|
||||
})
|
||||
if errors.Is(err, s3err.GetAPIError(s3err.ErrNoSuchUpload)) {
|
||||
return s3response.GetObjectAttributesResult{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
|
||||
}
|
||||
if err != nil {
|
||||
return s3response.GetObjectAttributesResult{}, err
|
||||
}
|
||||
|
||||
parts := []types.ObjectPart{}
|
||||
|
||||
for _, p := range resp.Parts {
|
||||
partNumber := int32(p.PartNumber)
|
||||
size := p.Size
|
||||
|
||||
parts = append(parts, types.ObjectPart{
|
||||
Size: &size,
|
||||
PartNumber: &partNumber,
|
||||
})
|
||||
}
|
||||
|
||||
//TODO: handle PartsCount prop
|
||||
return s3response.GetObjectAttributesResult{
|
||||
ObjectParts: &s3response.ObjectParts{
|
||||
IsTruncated: resp.IsTruncated,
|
||||
MaxParts: resp.MaxParts,
|
||||
PartNumberMarker: resp.PartNumberMarker,
|
||||
NextPartNumberMarker: resp.NextPartNumberMarker,
|
||||
Parts: parts,
|
||||
},
|
||||
ETag: data.ETag,
|
||||
LastModified: data.LastModified,
|
||||
ObjectSize: data.ContentLength,
|
||||
StorageClass: data.StorageClass,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (az *Azure) ListObjects(ctx context.Context, input *s3.ListObjectsInput) (s3response.ListObjectsResult, error) {
|
||||
pager := az.client.NewListBlobsFlatPager(*input.Bucket, &azblob.ListBlobsFlatOptions{
|
||||
client, err := az.getContainerClient(*input.Bucket)
|
||||
if err != nil {
|
||||
return s3response.ListObjectsResult{}, nil
|
||||
}
|
||||
pager := client.NewListBlobsHierarchyPager(*input.Delimiter, &container.ListBlobsHierarchyOptions{
|
||||
Marker: input.Marker,
|
||||
MaxResults: input.MaxKeys,
|
||||
Prefix: input.Prefix,
|
||||
})
|
||||
|
||||
var objects []s3response.Object
|
||||
var cPrefixes []types.CommonPrefix
|
||||
var nextMarker *string
|
||||
var isTruncated bool
|
||||
var maxKeys int32 = math.MaxInt32
|
||||
@@ -547,13 +541,10 @@ Pager:
|
||||
if err != nil {
|
||||
return s3response.ListObjectsResult{}, azureErrToS3Err(err)
|
||||
}
|
||||
|
||||
for _, v := range resp.Segment.BlobItems {
|
||||
if nextMarker == nil && *resp.NextMarker != "" {
|
||||
nextMarker = resp.NextMarker
|
||||
if len(objects)+len(cPrefixes) >= int(maxKeys) {
|
||||
nextMarker = objects[len(objects)-1].Key
|
||||
isTruncated = true
|
||||
}
|
||||
if len(objects) >= int(maxKeys) {
|
||||
break Pager
|
||||
}
|
||||
objects = append(objects, s3response.Object{
|
||||
@@ -561,7 +552,20 @@ Pager:
|
||||
Key: v.Name,
|
||||
LastModified: v.Properties.LastModified,
|
||||
Size: v.Properties.ContentLength,
|
||||
StorageClass: types.ObjectStorageClass(*v.Properties.AccessTier),
|
||||
StorageClass: types.ObjectStorageClassStandard,
|
||||
})
|
||||
}
|
||||
for _, v := range resp.Segment.BlobPrefixes {
|
||||
if *v.Name <= *input.Marker {
|
||||
continue
|
||||
}
|
||||
if len(objects)+len(cPrefixes) >= int(maxKeys) {
|
||||
nextMarker = cPrefixes[len(cPrefixes)-1].Prefix
|
||||
isTruncated = true
|
||||
break Pager
|
||||
}
|
||||
cPrefixes = append(cPrefixes, types.CommonPrefix{
|
||||
Prefix: v.Name,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -569,14 +573,15 @@ Pager:
|
||||
// TODO: generate common prefixes when appropriate
|
||||
|
||||
return s3response.ListObjectsResult{
|
||||
Contents: objects,
|
||||
Marker: input.Marker,
|
||||
MaxKeys: input.MaxKeys,
|
||||
Name: input.Bucket,
|
||||
NextMarker: nextMarker,
|
||||
Prefix: input.Prefix,
|
||||
IsTruncated: &isTruncated,
|
||||
Delimiter: input.Delimiter,
|
||||
Contents: objects,
|
||||
Marker: input.Marker,
|
||||
MaxKeys: input.MaxKeys,
|
||||
Name: input.Bucket,
|
||||
NextMarker: nextMarker,
|
||||
Prefix: input.Prefix,
|
||||
IsTruncated: &isTruncated,
|
||||
Delimiter: input.Delimiter,
|
||||
CommonPrefixes: cPrefixes,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -587,13 +592,18 @@ func (az *Azure) ListObjectsV2(ctx context.Context, input *s3.ListObjectsV2Input
|
||||
} else {
|
||||
marker = *input.StartAfter
|
||||
}
|
||||
pager := az.client.NewListBlobsFlatPager(*input.Bucket, &azblob.ListBlobsFlatOptions{
|
||||
client, err := az.getContainerClient(*input.Bucket)
|
||||
if err != nil {
|
||||
return s3response.ListObjectsV2Result{}, nil
|
||||
}
|
||||
pager := client.NewListBlobsHierarchyPager(*input.Delimiter, &container.ListBlobsHierarchyOptions{
|
||||
Marker: &marker,
|
||||
MaxResults: input.MaxKeys,
|
||||
Prefix: input.Prefix,
|
||||
})
|
||||
|
||||
var objects []s3response.Object
|
||||
var cPrefixes []types.CommonPrefix
|
||||
var nextMarker *string
|
||||
var isTruncated bool
|
||||
var maxKeys int32 = math.MaxInt32
|
||||
@@ -609,26 +619,34 @@ Pager:
|
||||
return s3response.ListObjectsV2Result{}, azureErrToS3Err(err)
|
||||
}
|
||||
for _, v := range resp.Segment.BlobItems {
|
||||
if nextMarker == nil && *resp.NextMarker != "" {
|
||||
nextMarker = resp.NextMarker
|
||||
if len(objects)+len(cPrefixes) >= int(maxKeys) {
|
||||
nextMarker = objects[len(objects)-1].Key
|
||||
isTruncated = true
|
||||
}
|
||||
if len(objects) >= int(maxKeys) {
|
||||
break Pager
|
||||
}
|
||||
nextMarker = resp.NextMarker
|
||||
objects = append(objects, s3response.Object{
|
||||
ETag: (*string)(v.Properties.ETag),
|
||||
Key: v.Name,
|
||||
LastModified: v.Properties.LastModified,
|
||||
Size: v.Properties.ContentLength,
|
||||
StorageClass: types.ObjectStorageClass(*v.Properties.AccessTier),
|
||||
StorageClass: types.ObjectStorageClassStandard,
|
||||
})
|
||||
}
|
||||
for _, v := range resp.Segment.BlobPrefixes {
|
||||
if *v.Name <= marker {
|
||||
continue
|
||||
}
|
||||
if len(objects)+len(cPrefixes) >= int(maxKeys) {
|
||||
nextMarker = cPrefixes[len(cPrefixes)-1].Prefix
|
||||
isTruncated = true
|
||||
break Pager
|
||||
}
|
||||
cPrefixes = append(cPrefixes, types.CommonPrefix{
|
||||
Prefix: v.Name,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: generate common prefixes when appropriate
|
||||
|
||||
return s3response.ListObjectsV2Result{
|
||||
Contents: objects,
|
||||
ContinuationToken: input.ContinuationToken,
|
||||
@@ -638,6 +656,7 @@ Pager:
|
||||
Prefix: input.Prefix,
|
||||
IsTruncated: &isTruncated,
|
||||
Delimiter: input.Delimiter,
|
||||
CommonPrefixes: cPrefixes,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -687,13 +706,21 @@ func (az *Azure) DeleteObjects(ctx context.Context, input *s3.DeleteObjectsInput
|
||||
}
|
||||
|
||||
func (az *Azure) CopyObject(ctx context.Context, input *s3.CopyObjectInput) (*s3.CopyObjectOutput, error) {
|
||||
mdmap, err := az.getContainerMetaDataMap(ctx, *input.Bucket)
|
||||
bclient, err := az.getBlobClient(*input.Bucket, *input.Key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if strings.Join([]string{*input.Bucket, *input.Key}, "/") == *input.CopySource && isMetaSame(mdmap, input.Metadata) {
|
||||
return nil, s3err.GetAPIError(s3err.ErrInvalidCopyDest)
|
||||
if strings.Join([]string{*input.Bucket, *input.Key}, "/") == *input.CopySource {
|
||||
props, err := bclient.GetProperties(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, azureErrToS3Err(err)
|
||||
}
|
||||
|
||||
mdmap := props.Metadata
|
||||
if isMetaSame(mdmap, input.Metadata) {
|
||||
return nil, s3err.GetAPIError(s3err.ErrInvalidCopyDest)
|
||||
}
|
||||
}
|
||||
|
||||
tags, err := parseTags(input.Tagging)
|
||||
@@ -701,11 +728,6 @@ func (az *Azure) CopyObject(ctx context.Context, input *s3.CopyObjectInput) (*s3
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bclient, err := az.getBlobClient(*input.Bucket, *input.Key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := bclient.CopyFromURL(ctx, az.serviceURL+"/"+*input.CopySource, &blob.CopyFromURLOptions{
|
||||
BlobTags: tags,
|
||||
Metadata: parseMetadata(input.Metadata),
|
||||
@@ -765,26 +787,96 @@ func (az *Azure) DeleteObjectTagging(ctx context.Context, bucket, object string)
|
||||
}
|
||||
|
||||
func (az *Azure) CreateMultipartUpload(ctx context.Context, input *s3.CreateMultipartUploadInput) (s3response.InitiateMultipartUploadResult, error) {
|
||||
// Multipart upload starts with UploadPart action so there is no
|
||||
// correlating function for creating mutlipart uploads.
|
||||
// TODO: since azure only allows for a single multipart upload
|
||||
// for an object name at a time, we need to send an error back to
|
||||
// the client if there is already an outstanding upload in progress
|
||||
// for this object.
|
||||
// Alternatively, is there something we can do with upload ids to
|
||||
// keep concurrent uploads unique still? I haven't found an efficient
|
||||
// way to rename final objects.
|
||||
if input.ObjectLockLegalHoldStatus != "" || input.ObjectLockMode != "" {
|
||||
bucketLock, err := az.getContainerMetaData(ctx, *input.Bucket, string(keyBucketLock))
|
||||
if err != nil {
|
||||
return s3response.InitiateMultipartUploadResult{}, azureErrToS3Err(err)
|
||||
}
|
||||
|
||||
if len(bucketLock) == 0 {
|
||||
return s3response.InitiateMultipartUploadResult{}, s3err.GetAPIError(s3err.ErrInvalidBucketObjectLockConfiguration)
|
||||
}
|
||||
|
||||
var bucketLockConfig auth.BucketLockConfig
|
||||
if err := json.Unmarshal(bucketLock, &bucketLockConfig); err != nil {
|
||||
return s3response.InitiateMultipartUploadResult{}, fmt.Errorf("parse bucket lock config: %w", err)
|
||||
}
|
||||
|
||||
if !bucketLockConfig.Enabled {
|
||||
return s3response.InitiateMultipartUploadResult{}, s3err.GetAPIError(s3err.ErrInvalidBucketObjectLockConfiguration)
|
||||
}
|
||||
}
|
||||
|
||||
meta := parseMetadata(input.Metadata)
|
||||
meta[string(onameAttr)] = input.Key
|
||||
|
||||
// parse object tags
|
||||
tagsStr := getString(input.Tagging)
|
||||
tags := map[string]string{}
|
||||
if tagsStr != "" {
|
||||
tagParts := strings.Split(tagsStr, "&")
|
||||
for _, prt := range tagParts {
|
||||
p := strings.Split(prt, "=")
|
||||
if len(p) != 2 {
|
||||
return s3response.InitiateMultipartUploadResult{}, s3err.GetAPIError(s3err.ErrInvalidTag)
|
||||
}
|
||||
if len(p[0]) > 128 || len(p[1]) > 256 {
|
||||
return s3response.InitiateMultipartUploadResult{}, s3err.GetAPIError(s3err.ErrInvalidTag)
|
||||
}
|
||||
tags[p[0]] = p[1]
|
||||
}
|
||||
}
|
||||
|
||||
// set blob legal hold status in metadata
|
||||
if input.ObjectLockLegalHoldStatus == types.ObjectLockLegalHoldStatusOn {
|
||||
meta[string(keyObjLegalHold)] = backend.GetStringPtr("1")
|
||||
}
|
||||
|
||||
// set blob retention date
|
||||
if input.ObjectLockMode != "" {
|
||||
retention := types.ObjectLockRetention{
|
||||
Mode: types.ObjectLockRetentionMode(input.ObjectLockMode),
|
||||
RetainUntilDate: input.ObjectLockRetainUntilDate,
|
||||
}
|
||||
retParsed, err := json.Marshal(retention)
|
||||
if err != nil {
|
||||
return s3response.InitiateMultipartUploadResult{}, azureErrToS3Err(err)
|
||||
}
|
||||
meta[string(keyObjRetention)] = backend.GetStringPtr(string(retParsed))
|
||||
}
|
||||
|
||||
uploadId := uuid.New().String()
|
||||
|
||||
tmpPath := createMetaTmpPath(*input.Key, uploadId)
|
||||
|
||||
opts := &blockblob.UploadBufferOptions{
|
||||
Metadata: meta,
|
||||
Tags: tags,
|
||||
}
|
||||
if getString(input.ContentType) != "" {
|
||||
opts.HTTPHeaders = &blob.HTTPHeaders{
|
||||
BlobContentType: input.ContentType,
|
||||
}
|
||||
}
|
||||
|
||||
// Create and empty blob in .sgwtmp/multipart/<uploadId>/<object hash>
|
||||
// The blob indicates multipart upload initialization and holds the mp metadata
|
||||
// e.g tagging, content-type, metadata, object lock status ...
|
||||
_, err := az.client.UploadBuffer(ctx, *input.Bucket, tmpPath, []byte{}, opts)
|
||||
if err != nil {
|
||||
return s3response.InitiateMultipartUploadResult{}, azureErrToS3Err(err)
|
||||
}
|
||||
|
||||
return s3response.InitiateMultipartUploadResult{
|
||||
Bucket: *input.Bucket,
|
||||
Key: *input.Key,
|
||||
UploadId: *input.Key,
|
||||
UploadId: uploadId,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Each part is translated into an uncommitted block in a newly created blob in staging area
|
||||
func (az *Azure) UploadPart(ctx context.Context, input *s3.UploadPartInput) (etag string, err error) {
|
||||
client, err := az.getBlockBlobClient(*input.Bucket, *input.Key)
|
||||
if err != nil {
|
||||
if err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -797,6 +889,11 @@ func (az *Azure) UploadPart(ctx context.Context, input *s3.UploadPartInput) (eta
|
||||
return "", err
|
||||
}
|
||||
|
||||
client, err := az.getBlockBlobClient(*input.Bucket, *input.Key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// block id serves as etag here
|
||||
etag = blockIDInt32ToBase64(*input.PartNumber)
|
||||
_, err = client.StageBlock(ctx, etag, rdr, nil)
|
||||
@@ -813,10 +910,14 @@ func (az *Azure) UploadPartCopy(ctx context.Context, input *s3.UploadPartCopyInp
|
||||
return s3response.CopyObjectResult{}, nil
|
||||
}
|
||||
|
||||
if err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId); err != nil {
|
||||
return s3response.CopyObjectResult{}, err
|
||||
}
|
||||
|
||||
eTag := blockIDInt32ToBase64(*input.PartNumber)
|
||||
//TODO: handle block copy by range
|
||||
//TODO: the action returns not implemented on azurite, maybe in production this will work?
|
||||
// UploadId here is the source block id
|
||||
_, err = client.StageBlockFromURL(ctx, *input.UploadId, *input.CopySource, nil)
|
||||
_, err = client.StageBlockFromURL(ctx, eTag, *input.CopySource, nil)
|
||||
if err != nil {
|
||||
return s3response.CopyObjectResult{}, parseMpError(err)
|
||||
}
|
||||
@@ -826,15 +927,14 @@ func (az *Azure) UploadPartCopy(ctx context.Context, input *s3.UploadPartCopyInp
|
||||
|
||||
// Lists all uncommitted parts from the blob
|
||||
func (az *Azure) ListParts(ctx context.Context, input *s3.ListPartsInput) (s3response.ListPartsResult, error) {
|
||||
if err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId); err != nil {
|
||||
return s3response.ListPartsResult{}, err
|
||||
}
|
||||
client, err := az.getBlockBlobClient(*input.Bucket, *input.Key)
|
||||
if err != nil {
|
||||
return s3response.ListPartsResult{}, nil
|
||||
}
|
||||
|
||||
resp, err := client.GetBlockList(ctx, blockblob.BlockListTypeUncommitted, nil)
|
||||
if err != nil {
|
||||
return s3response.ListPartsResult{}, parseMpError(err)
|
||||
}
|
||||
var partNumberMarker int
|
||||
var nextPartNumberMarker int
|
||||
var maxParts int32 = math.MaxInt32
|
||||
@@ -850,13 +950,28 @@ func (az *Azure) ListParts(ctx context.Context, input *s3.ListPartsInput) (s3res
|
||||
maxParts = *input.MaxParts
|
||||
}
|
||||
|
||||
resp, err := client.GetBlockList(ctx, blockblob.BlockListTypeUncommitted, nil)
|
||||
if err != nil {
|
||||
// If the mp exists but the client returns 'NoSuchKey' error, return empty result
|
||||
if errors.Is(azureErrToS3Err(err), s3err.GetAPIError(s3err.ErrNoSuchKey)) {
|
||||
return s3response.ListPartsResult{
|
||||
Bucket: *input.Bucket,
|
||||
Key: *input.Key,
|
||||
PartNumberMarker: partNumberMarker,
|
||||
IsTruncated: isTruncated,
|
||||
MaxParts: int(maxParts),
|
||||
StorageClass: types.StorageClassStandard,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
parts := []s3response.Part{}
|
||||
for _, el := range resp.UncommittedBlocks {
|
||||
partNumber, err := decodeBlockId(*el.Name)
|
||||
if err != nil {
|
||||
return s3response.ListPartsResult{}, err
|
||||
}
|
||||
if partNumberMarker != 0 && partNumberMarker >= partNumber {
|
||||
if partNumberMarker >= partNumber {
|
||||
continue
|
||||
}
|
||||
parts = append(parts, s3response.Part{
|
||||
@@ -879,29 +994,29 @@ func (az *Azure) ListParts(ctx context.Context, input *s3.ListPartsInput) (s3res
|
||||
PartNumberMarker: partNumberMarker,
|
||||
IsTruncated: isTruncated,
|
||||
MaxParts: int(maxParts),
|
||||
StorageClass: types.StorageClassStandard,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Lists all block blobs, which has uncommitted blocks
|
||||
// Lists all the multipart uploads initiated with .sgwtmp/multipart prefix
|
||||
func (az *Azure) ListMultipartUploads(ctx context.Context, input *s3.ListMultipartUploadsInput) (s3response.ListMultipartUploadsResult, error) {
|
||||
client, err := az.getContainerClient(*input.Bucket)
|
||||
if err != nil {
|
||||
return s3response.ListMultipartUploadsResult{}, err
|
||||
}
|
||||
pager := client.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
|
||||
Include: container.ListBlobsInclude{UncommittedBlobs: true},
|
||||
Marker: input.KeyMarker,
|
||||
Prefix: input.Prefix,
|
||||
})
|
||||
|
||||
var maxUploads int32
|
||||
if input.MaxUploads != nil {
|
||||
maxUploads = *input.MaxUploads
|
||||
}
|
||||
isTruncated := false
|
||||
nextKeyMarker := ""
|
||||
uploads := []s3response.Upload{}
|
||||
breakFlag := false
|
||||
|
||||
var uploadIDMarker string
|
||||
if input.UploadIdMarker != nil {
|
||||
uploadIDMarker = *input.UploadIdMarker
|
||||
}
|
||||
uploadIdMarkerFound := false
|
||||
prefix := string(metaTmpMultipartPrefix)
|
||||
|
||||
pager := client.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
|
||||
Prefix: &prefix,
|
||||
})
|
||||
|
||||
for pager.More() {
|
||||
resp, err := pager.NextPage(ctx)
|
||||
@@ -909,49 +1024,131 @@ func (az *Azure) ListMultipartUploads(ctx context.Context, input *s3.ListMultipa
|
||||
return s3response.ListMultipartUploadsResult{}, azureErrToS3Err(err)
|
||||
}
|
||||
for _, el := range resp.Segment.BlobItems {
|
||||
if el.Properties.AccessTier == nil {
|
||||
if len(uploads) >= int(*input.MaxUploads) && maxUploads != 0 {
|
||||
breakFlag = true
|
||||
nextKeyMarker = *el.Name
|
||||
isTruncated = true
|
||||
break
|
||||
}
|
||||
uploads = append(uploads, s3response.Upload{
|
||||
Key: *el.Name,
|
||||
Initiated: *el.Properties.CreationTime,
|
||||
})
|
||||
key, ok := el.Metadata[string(onameAttrLower)]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if breakFlag {
|
||||
break
|
||||
if *key <= *input.KeyMarker {
|
||||
continue
|
||||
}
|
||||
if input.Prefix != nil && !strings.HasPrefix(*key, *input.Prefix) {
|
||||
continue
|
||||
}
|
||||
|
||||
path := filepath.Clean(*el.Name)
|
||||
parts := strings.Split(path, "/")
|
||||
uploadId := parts[2]
|
||||
|
||||
uploads = append(uploads, s3response.Upload{
|
||||
Key: *key,
|
||||
Initiated: *el.Properties.CreationTime,
|
||||
UploadID: uploadId,
|
||||
StorageClass: types.StorageClassStandard,
|
||||
})
|
||||
}
|
||||
}
|
||||
return s3response.ListMultipartUploadsResult{
|
||||
Uploads: uploads,
|
||||
Bucket: *input.Bucket,
|
||||
KeyMarker: *input.KeyMarker,
|
||||
NextKeyMarker: nextKeyMarker,
|
||||
MaxUploads: int(maxUploads),
|
||||
Prefix: *input.Prefix,
|
||||
IsTruncated: isTruncated,
|
||||
Delimiter: *input.Delimiter,
|
||||
}, nil
|
||||
maxUploads := 1000
|
||||
if input.MaxUploads != nil {
|
||||
maxUploads = int(*input.MaxUploads)
|
||||
}
|
||||
if *input.KeyMarker != "" && uploadIDMarker != "" && !uploadIdMarkerFound {
|
||||
return s3response.ListMultipartUploadsResult{
|
||||
Bucket: *input.Bucket,
|
||||
Delimiter: *input.Delimiter,
|
||||
KeyMarker: *input.KeyMarker,
|
||||
MaxUploads: maxUploads,
|
||||
Prefix: *input.Prefix,
|
||||
UploadIDMarker: *input.UploadIdMarker,
|
||||
Uploads: []s3response.Upload{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
sort.SliceStable(uploads, func(i, j int) bool {
|
||||
return uploads[i].Key < uploads[j].Key
|
||||
})
|
||||
|
||||
if *input.KeyMarker != "" && *input.UploadIdMarker != "" {
|
||||
// the uploads are already filtered by keymarker
|
||||
// filter the uploads by uploadIdMarker
|
||||
for i, upl := range uploads {
|
||||
if upl.UploadID == uploadIDMarker {
|
||||
uploads = uploads[i+1:]
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(uploads) <= maxUploads {
|
||||
return s3response.ListMultipartUploadsResult{
|
||||
Bucket: *input.Bucket,
|
||||
Delimiter: *input.Delimiter,
|
||||
KeyMarker: *input.KeyMarker,
|
||||
MaxUploads: maxUploads,
|
||||
Prefix: *input.Prefix,
|
||||
UploadIDMarker: *input.UploadIdMarker,
|
||||
Uploads: uploads,
|
||||
}, nil
|
||||
} else {
|
||||
resUploads := uploads[:maxUploads]
|
||||
return s3response.ListMultipartUploadsResult{
|
||||
Bucket: *input.Bucket,
|
||||
Delimiter: *input.Delimiter,
|
||||
KeyMarker: *input.KeyMarker,
|
||||
NextKeyMarker: resUploads[len(resUploads)-1].Key,
|
||||
MaxUploads: maxUploads,
|
||||
Prefix: *input.Prefix,
|
||||
UploadIDMarker: *input.UploadIdMarker,
|
||||
NextUploadIDMarker: resUploads[len(resUploads)-1].UploadID,
|
||||
IsTruncated: true,
|
||||
Uploads: resUploads,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Deletes the block blob with committed/uncommitted blocks
|
||||
// Cleans up the initiated multipart upload in .sgwtmp namespace
|
||||
func (az *Azure) AbortMultipartUpload(ctx context.Context, input *s3.AbortMultipartUploadInput) error {
|
||||
// TODO: need to verify this blob has uncommitted blocks?
|
||||
_, err := az.client.DeleteBlob(ctx, *input.Bucket, *input.Key, nil)
|
||||
tmpPath := createMetaTmpPath(*input.Key, *input.UploadId)
|
||||
_, err := az.client.DeleteBlob(ctx, *input.Bucket, tmpPath, nil)
|
||||
if err != nil {
|
||||
return parseMpError(err)
|
||||
}
|
||||
|
||||
// Cleanup the uploaded parts
|
||||
_, err = az.client.DeleteBlob(ctx, *input.Bucket, *input.Key, nil)
|
||||
if err != nil {
|
||||
err = azureErrToS3Err(err)
|
||||
if errors.Is(err, s3err.GetAPIError(s3err.ErrNoSuchKey)) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Commits all the uncommitted blocks inside the block blob
|
||||
// And moves the block blob from staging area into the blobs list
|
||||
// And moves the block blob from staging area into the blobs list.
|
||||
// Copeies the multipart metadata from .sgwtmp namespace into the newly created blob
|
||||
// Deletes the multipart upload 'blob' from .sgwtmp namespace
|
||||
// It indicates the end of the multipart upload
|
||||
func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
|
||||
tmpPath := createMetaTmpPath(*input.Key, *input.UploadId)
|
||||
blobClient, err := az.getBlobClient(*input.Bucket, tmpPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
props, err := blobClient.GetProperties(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, parseMpError(err)
|
||||
}
|
||||
tags, err := blobClient.GetTags(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, parseMpError(err)
|
||||
}
|
||||
|
||||
client, err := az.getBlockBlobClient(*input.Bucket, *input.Key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -988,7 +1185,21 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
|
||||
blockIds = append(blockIds, *block.Name)
|
||||
}
|
||||
|
||||
resp, err := client.CommitBlockList(ctx, blockIds, nil)
|
||||
opts := &blockblob.CommitBlockListOptions{
|
||||
Metadata: props.Metadata,
|
||||
Tags: parseAzTags(tags.BlobTagSet),
|
||||
}
|
||||
opts.HTTPHeaders = &blob.HTTPHeaders{
|
||||
BlobContentType: props.ContentType,
|
||||
}
|
||||
|
||||
resp, err := client.CommitBlockList(ctx, blockIds, opts)
|
||||
if err != nil {
|
||||
return nil, parseMpError(err)
|
||||
}
|
||||
|
||||
// cleanup the multipart upload
|
||||
_, err = blobClient.Delete(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, parseMpError(err)
|
||||
}
|
||||
@@ -1313,9 +1524,15 @@ func parseAzMetadata(m map[string]*string) map[string]string {
|
||||
return nil
|
||||
}
|
||||
|
||||
keywords := keyTags.Table()
|
||||
|
||||
meta := make(map[string]string)
|
||||
|
||||
for k, v := range m {
|
||||
_, ok := keywords[strings.ToLower(k)]
|
||||
if ok {
|
||||
continue
|
||||
}
|
||||
meta[k] = *v
|
||||
}
|
||||
return meta
|
||||
@@ -1427,20 +1644,6 @@ func (az *Azure) getContainerMetaData(ctx context.Context, bucket, key string) (
|
||||
return value, nil
|
||||
}
|
||||
|
||||
func (az *Azure) getContainerMetaDataMap(ctx context.Context, bucket string) (map[string]*string, error) {
|
||||
client, err := az.getContainerClient(bucket)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
props, err := client.GetProperties(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, azureErrToS3Err(err)
|
||||
}
|
||||
|
||||
return props.Metadata, nil
|
||||
}
|
||||
|
||||
func (az *Azure) setContainerMetaData(ctx context.Context, bucket, key string, value []byte) error {
|
||||
client, err := az.getContainerClient(bucket)
|
||||
if err != nil {
|
||||
@@ -1517,7 +1720,7 @@ func getAclFromMetadata(meta map[string]*string, key key) (*auth.ACL, error) {
|
||||
}
|
||||
|
||||
func isMetaSame(azMeta map[string]*string, awsMeta map[string]string) bool {
|
||||
if len(azMeta) != len(awsMeta)+1 {
|
||||
if len(azMeta) != len(awsMeta) {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -1533,3 +1736,24 @@ func isMetaSame(azMeta map[string]*string, awsMeta map[string]string) bool {
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func createMetaTmpPath(obj, uploadId string) string {
|
||||
objNameSum := sha256.Sum256([]byte(obj))
|
||||
return filepath.Join(string(metaTmpMultipartPrefix), uploadId, fmt.Sprintf("%x", objNameSum))
|
||||
}
|
||||
|
||||
// Checks if the multipart upload existis with the given bucket, key and uploadId
|
||||
func (az *Azure) checkIfMpExists(ctx context.Context, bucket, obj, uploadId string) error {
|
||||
tmpPath := createMetaTmpPath(obj, uploadId)
|
||||
blobClient, err := az.getBlobClient(bucket, tmpPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = blobClient.GetProperties(ctx, nil)
|
||||
if err != nil {
|
||||
return s3err.GetAPIError(s3err.ErrNoSuchUpload)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -132,7 +132,6 @@ func TestPutObject(s *S3Conf) {
|
||||
PutObject_special_chars(s)
|
||||
PutObject_invalid_long_tags(s)
|
||||
PutObject_missing_object_lock_retention_config(s)
|
||||
PutObject_name_too_long(s)
|
||||
PutObject_with_object_lock(s)
|
||||
PutObject_success(s)
|
||||
PutObject_invalid_credentials(s)
|
||||
@@ -192,7 +191,6 @@ func TestListObjectsV2(s *S3Conf) {
|
||||
|
||||
func TestDeleteObject(s *S3Conf) {
|
||||
DeleteObject_non_existing_object(s)
|
||||
DeleteObject_name_too_long(s)
|
||||
DeleteObject_non_existing_dir_object(s)
|
||||
DeleteObject_success(s)
|
||||
DeleteObject_success_status_code(s)
|
||||
@@ -480,6 +478,9 @@ func TestPosix(s *S3Conf) {
|
||||
PutObject_overwrite_file_obj(s)
|
||||
PutObject_dir_obj_with_data(s)
|
||||
CreateMultipartUpload_dir_obj(s)
|
||||
PutObject_name_too_long(s)
|
||||
HeadObject_name_too_long(s)
|
||||
DeleteObject_name_too_long(s)
|
||||
}
|
||||
|
||||
func TestIAM(s *S3Conf) {
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -29,6 +29,7 @@ import (
|
||||
"net/url"
|
||||
"os"
|
||||
"os/exec"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -282,19 +283,34 @@ func checkSdkApiErr(err error, code string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func putObjects(client *s3.Client, objs []string, bucket string) error {
|
||||
func putObjects(client *s3.Client, objs []string, bucket string) ([]types.Object, error) {
|
||||
var contents []types.Object
|
||||
var size int64
|
||||
for _, key := range objs {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err := client.PutObject(ctx, &s3.PutObjectInput{
|
||||
res, err := client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Key: &key,
|
||||
Bucket: &bucket,
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
k := key
|
||||
etag := strings.Trim(*res.ETag, `"`)
|
||||
contents = append(contents, types.Object{
|
||||
Key: &k,
|
||||
ETag: &etag,
|
||||
StorageClass: types.ObjectStorageClassStandard,
|
||||
Size: &size,
|
||||
})
|
||||
}
|
||||
return nil
|
||||
|
||||
sort.SliceStable(contents, func(i, j int) bool {
|
||||
return *contents[i].Key < *contents[j].Key
|
||||
})
|
||||
|
||||
return contents, nil
|
||||
}
|
||||
|
||||
func putObjectWithData(lgth int64, input *s3.PutObjectInput, client *s3.Client) (csum [32]byte, data []byte, err error) {
|
||||
@@ -486,22 +502,6 @@ func compareObjects(list1, list2 []types.Object) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Creates a list of types.Object with the provided objects keys: objs []string
|
||||
func createEmptyObjectsList(objs []string) (result []types.Object) {
|
||||
size := int64(0)
|
||||
for _, obj := range objs {
|
||||
o := obj
|
||||
result = append(result, types.Object{
|
||||
Key: &o,
|
||||
Size: &size,
|
||||
StorageClass: types.ObjectStorageClassStandard,
|
||||
ETag: &emptyObjETag,
|
||||
})
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func comparePrefixes(list1 []string, list2 []types.CommonPrefix) bool {
|
||||
if len(list1) != len(list2) {
|
||||
return false
|
||||
|
||||
Reference in New Issue
Block a user