From 8cc89fa713775f4eba975ab547fe745f993e66eb Mon Sep 17 00:00:00 2001 From: jonaustin09 Date: Fri, 22 Dec 2023 08:11:21 -0500 Subject: [PATCH] feat: Azure backend implementation --- backend/azure/azure.go | 343 ++++++++++++++++++++++++++++++++++++++- cmd/versitygw/azure.go | 14 +- s3response/s3response.go | 2 +- 3 files changed, 341 insertions(+), 18 deletions(-) diff --git a/backend/azure/azure.go b/backend/azure/azure.go index f72c528..b1e2079 100644 --- a/backend/azure/azure.go +++ b/backend/azure/azure.go @@ -15,13 +15,22 @@ package azure import ( + "bytes" "context" + "encoding/base64" + "encoding/binary" + "errors" "fmt" "io" + "math" "strings" + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/versity/versitygw/backend" @@ -32,7 +41,9 @@ import ( type Azure struct { backend.BackendUnsupported - client *azblob.Client + client *azblob.Client + creds *azblob.SharedKeyCredential + serviceURL string } var _ backend.Backend = &Azure{} @@ -48,7 +59,7 @@ func New(accountName, accountKey, serviceURL string) (*Azure, error) { return nil, fmt.Errorf("init client: %w", err) } - return &Azure{client: client}, nil + return &Azure{client: client, serviceURL: serviceURL, creds: cred}, nil } func (az *Azure) Shutdown() {} @@ -88,8 +99,19 @@ func (az *Azure) ListBuckets(ctx context.Context, owner string, isAdmin bool) (s return result, nil } -//func (az *Azure) HeadBucket(ctx context.Context, input *s3.HeadBucketInput) (*s3.HeadBucketOutput, error) { -//} +func (az *Azure) HeadBucket(ctx context.Context, input *s3.HeadBucketInput) (*s3.HeadBucketOutput, error) { + client, err := az.getContainerClient(*input.Bucket) + if err != nil { + return nil, err + } + + _, err = client.GetProperties(ctx, nil) + if err != nil { + return nil, azureErrToS3Err(err) + } + + return &s3.HeadBucketOutput{}, nil +} func (az *Azure) DeleteBucket(ctx context.Context, input *s3.DeleteBucketInput) error { _, err := az.client.DeleteContainer(ctx, *input.Bucket, nil) @@ -126,7 +148,10 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput, writer return nil, fmt.Errorf("copy data: %w", err) } - tagcount := int32(*blobDownloadResponse.TagCount) + var tagcount int32 + if blobDownloadResponse.TagCount != nil { + tagcount = int32(*blobDownloadResponse.TagCount) + } return &s3.GetObjectOutput{ AcceptRanges: blobDownloadResponse.AcceptRanges, @@ -141,6 +166,31 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput, writer }, nil } +func (az *Azure) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) { + client, err := az.getBlobClient(*input.Bucket, *input.Key) + if err != nil { + return nil, err + } + + resp, err := client.GetProperties(ctx, nil) + if err != nil { + return nil, azureErrToS3Err(err) + } + + return &s3.HeadObjectOutput{ + AcceptRanges: resp.AcceptRanges, + ContentLength: resp.ContentLength, + ContentType: resp.ContentType, + ContentEncoding: resp.ContentEncoding, + ContentLanguage: resp.ContentLanguage, + ContentDisposition: resp.ContentDisposition, + ETag: (*string)(resp.ETag), + LastModified: resp.LastModified, + Metadata: parseAzMetadata(resp.Metadata), + Expires: resp.ExpiresOn, + }, nil +} + func (az *Azure) ListObjects(ctx context.Context, input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) { pager := az.client.NewListBlobsFlatPager(*input.Bucket, &azblob.ListBlobsFlatOptions{ Marker: input.Marker, @@ -259,6 +309,239 @@ func (az *Azure) DeleteObjects(ctx context.Context, input *s3.DeleteObjectsInput }, nil } +func (az *Azure) CopyObject(ctx context.Context, input *s3.CopyObjectInput) (*s3.CopyObjectOutput, error) { + client, err := az.getBlobClient(*input.Bucket, *input.Key) + if err != nil { + return nil, err + } + + tags, err := parseTags(input.Tagging) + if err != nil { + return nil, err + } + + resp, err := client.CopyFromURL(ctx, az.serviceURL+"/"+*input.CopySource, &blob.CopyFromURLOptions{ + BlobTags: tags, + Metadata: parseMetadata(input.Metadata), + }) + if err != nil { + return nil, azureErrToS3Err(err) + } + + return &s3.CopyObjectOutput{ + CopyObjectResult: &types.CopyObjectResult{ + ETag: (*string)(resp.ETag), + LastModified: resp.LastModified, + }, + }, nil +} + +func (az *Azure) PutObjectTagging(ctx context.Context, bucket, object string, tags map[string]string) error { + client, err := az.getBlobClient(bucket, object) + if err != nil { + return err + } + + _, err = client.SetTags(ctx, tags, nil) + if err != nil { + return azureErrToS3Err(err) + } + + return nil +} + +func (az *Azure) GetObjectTagging(ctx context.Context, bucket, object string) (map[string]string, error) { + client, err := az.getBlobClient(bucket, object) + if err != nil { + return nil, err + } + + tags, err := client.GetTags(ctx, nil) + if err != nil { + return nil, azureErrToS3Err(err) + } + + return parseAzTags(tags.BlobTagSet), nil +} + +func (az *Azure) DeleteObjectTagging(ctx context.Context, bucket, object string) error { + client, err := az.getBlobClient(bucket, object) + if err != nil { + return err + } + + //TODO: SDK has a bug here: it recommends to use the method to remove tags by passing an empty map, + // but the method panics because of incorrect implementation + _, err = client.SetTags(ctx, map[string]string{}, nil) + if err != nil { + return azureErrToS3Err(err) + } + + return nil +} + +func (az *Azure) UploadPart(ctx context.Context, input *s3.UploadPartInput) (etag string, err error) { + client, err := az.getBlockBlobClient(*input.Bucket, *input.Key) + if err != nil { + return "", err + } + + rdr, err := getReadSeekCloser(input.Body) + if err != nil { + return "", err + } + + etag = blockIDInt32ToBase64(*input.PartNumber) + _, err = client.StageBlock(ctx, etag, rdr, nil) + if err != nil { + return "", azureErrToS3Err(err) + } + + return etag, nil +} + +func (az *Azure) UploadPartCopy(ctx context.Context, input *s3.UploadPartCopyInput) (s3response.CopyObjectResult, error) { + client, err := az.getBlockBlobClient(*input.Bucket, *input.Key) + if err != nil { + return s3response.CopyObjectResult{}, nil + } + + //TODO: handle block copy by range + //TODO: the action returns not implemented + _, err = client.StageBlockFromURL(ctx, *input.UploadId, *input.CopySource, nil) + if err != nil { + return s3response.CopyObjectResult{}, azureErrToS3Err(err) + } + + return s3response.CopyObjectResult{}, nil +} + +func (az *Azure) ListParts(ctx context.Context, input *s3.ListPartsInput) (s3response.ListPartsResult, error) { + client, err := az.getBlockBlobClient(*input.Bucket, *input.Key) + if err != nil { + return s3response.ListPartsResult{}, nil + } + + resp, err := client.GetBlockList(ctx, blockblob.BlockListTypeUncommitted, nil) + if err != nil { + return s3response.ListPartsResult{}, azureErrToS3Err(err) + } + + parts := []s3response.Part{} + for _, el := range resp.BlockList.UncommittedBlocks { + partNumber, err := decodeBlockId(*el.Name) + if err != nil { + return s3response.ListPartsResult{}, err + } + parts = append(parts, s3response.Part{ + Size: *el.Size, + ETag: *el.Name, + PartNumber: partNumber, + }) + } + return s3response.ListPartsResult{ + Bucket: *input.Bucket, + Key: *input.Key, + Parts: parts, + }, nil +} + +func (az *Azure) ListMultipartUploads(ctx context.Context, input *s3.ListMultipartUploadsInput) (s3response.ListMultipartUploadsResult, error) { + client, err := az.getContainerClient(*input.Bucket) + if err != nil { + return s3response.ListMultipartUploadsResult{}, nil + } + pager := client.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{ + Include: container.ListBlobsInclude{UncommittedBlobs: true}, + Marker: input.KeyMarker, + Prefix: input.Prefix, + }) + + var maxUploads int32 = math.MaxInt32 + if input.MaxUploads != nil { + maxUploads = *input.MaxUploads + } + isTruncated := false + nextKeyMarker := "" + uploads := []s3response.Upload{} + breakFlag := false + + for pager.More() { + resp, err := pager.NextPage(ctx) + if err != nil { + return s3response.ListMultipartUploadsResult{}, azureErrToS3Err(err) + } + for _, el := range resp.Segment.BlobItems { + if el.Properties.AccessTier == nil { + if len(uploads) >= int(*input.MaxUploads) { + breakFlag = true + nextKeyMarker = *el.Name + isTruncated = true + break + } + uploads = append(uploads, s3response.Upload{ + Key: *el.Name, + Initiated: el.Properties.CreationTime.Format(backend.RFC3339TimeFormat), + }) + } + } + if breakFlag { + break + } + } + return s3response.ListMultipartUploadsResult{ + Uploads: uploads, + Bucket: *input.Bucket, + KeyMarker: *input.KeyMarker, + NextKeyMarker: nextKeyMarker, + MaxUploads: int(maxUploads), + Prefix: *input.Prefix, + IsTruncated: isTruncated, + Delimiter: *input.Delimiter, + }, nil +} + +func (az *Azure) AbortMultipartUpload(ctx context.Context, input *s3.AbortMultipartUploadInput) error { + _, err := az.client.DeleteBlob(ctx, *input.Bucket, *input.Key, nil) + if err != nil { + return azureErrToS3Err(err) + } + return nil +} + +func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) { + client, err := az.getBlockBlobClient(*input.Bucket, *input.Key) + if err != nil { + return nil, err + } + blockIds := []string{} + for _, el := range input.MultipartUpload.Parts { + blockIds = append(blockIds, *el.ETag) + } + resp, err := client.CommitBlockList(ctx, blockIds, nil) + if err != nil { + return nil, azureErrToS3Err(err) + } + + return &s3.CompleteMultipartUploadOutput{ + Bucket: input.Bucket, + Key: input.Key, + ETag: (*string)(resp.ETag), + }, nil +} + +func (az *Azure) getBlobClient(container, blb string) (*blob.Client, error) { + return blob.NewClientWithSharedKeyCredential(fmt.Sprintf("%v/%v/%v", az.serviceURL, container, blb), az.creds, nil) +} + +func (az *Azure) getContainerClient(ctr string) (*container.Client, error) { + return container.NewClientWithSharedKeyCredential(fmt.Sprintf("%v/%v", az.serviceURL, ctr), az.creds, nil) +} + +func (az *Azure) getBlockBlobClient(container, blob string) (*blockblob.Client, error) { + return blockblob.NewClientWithSharedKeyCredential(fmt.Sprintf("%v/%v/%v", az.serviceURL, container, blob), az.creds, nil) +} + func parseMetadata(m map[string]string) map[string]*string { if m == nil { return nil @@ -303,6 +586,15 @@ func parseTags(tagstr *string) (map[string]string, error) { return tags, nil } +func parseAzTags(tagSet []*blob.Tags) map[string]string { + tags := map[string]string{} + for _, tag := range tagSet { + tags[*tag.Key] = *tag.Value + } + + return tags +} + func getString(str *string) string { if str == nil { return "" @@ -314,7 +606,42 @@ func getStringPtr(str string) *string { return &str } -func azureErrToS3Err(err error) error { - // TODO: map azure errors to s3 errors - return err +func azureErrToS3Err(apiErr error) error { + var azErr *azcore.ResponseError + if !errors.As(apiErr, &azErr) { + return apiErr + } + + resp := s3err.APIError{ + Code: azErr.ErrorCode, + Description: azErr.RawResponse.Status, + HTTPStatusCode: azErr.StatusCode, + } + fmt.Println(resp) + return resp +} + +func getReadSeekCloser(input io.Reader) (io.ReadSeekCloser, error) { + var buffer bytes.Buffer + _, err := io.Copy(&buffer, input) + if err != nil { + return nil, err + } + + return streaming.NopCloser(bytes.NewReader(buffer.Bytes())), nil +} + +func blockIDInt32ToBase64(blockID int32) string { + binaryBlockID := &[4]byte{} // All block IDs are 4 bytes long + binary.LittleEndian.PutUint32(binaryBlockID[:], uint32(blockID)) + return base64.StdEncoding.EncodeToString(binaryBlockID[:]) +} + +func decodeBlockId(blockID string) (int, error) { + slice, err := base64.StdEncoding.DecodeString(blockID) + if err != nil { + return 0, nil + } + + return int(binary.LittleEndian.Uint32(slice)), nil } diff --git a/cmd/versitygw/azure.go b/cmd/versitygw/azure.go index b608ff9..8fe81f4 100644 --- a/cmd/versitygw/azure.go +++ b/cmd/versitygw/azure.go @@ -36,13 +36,13 @@ func azureCommand() *cli.Command { Name: "account", Usage: "azure account name", EnvVars: []string{"AZ_ACCESS_KEY"}, - Aliases: []string{"s"}, + Aliases: []string{"a"}, Destination: &azAccount, }, &cli.StringFlag{ - Name: "secret", - Usage: "azure secret key", - EnvVars: []string{"AZ_SECRET_KEY"}, + Name: "account-key", + Usage: "azure account key", + EnvVars: []string{"AZ_ACCOUNT_KEY"}, Aliases: []string{"s"}, Destination: &azKey, }, @@ -58,10 +58,6 @@ func azureCommand() *cli.Command { } func runAzure(ctx *cli.Context) error { - if ctx.NArg() == 0 { - return fmt.Errorf("no directory provided for operation") - } - if azServiceURL == "" { // if not otherwise specified, use the typical form: http(s)://.blob.core.windows.net/ azServiceURL = fmt.Sprintf("https://%s.blob.core.windows.net/", azAccount) @@ -69,7 +65,7 @@ func runAzure(ctx *cli.Context) error { be, err := azure.New(azAccount, azKey, azServiceURL) if err != nil { - return fmt.Errorf("init posix: %v", err) + return fmt.Errorf("init azure: %v", err) } return runGateway(ctx.Context, be) diff --git a/s3response/s3response.go b/s3response/s3response.go index 07d3fa6..cb0b401 100644 --- a/s3response/s3response.go +++ b/s3response/s3response.go @@ -73,7 +73,7 @@ type ListMultipartUploadsResult struct { CommonPrefixes []CommonPrefix } -// Upload desribes in progress multipart upload +// Upload describes in progress multipart upload type Upload struct { Key string UploadID string `xml:"UploadId"`