feat: Azure backend implementation

This commit is contained in:
jonaustin09
2023-12-22 08:11:21 -05:00
committed by Ben McClelland
parent 3b945f72fc
commit 8cc89fa713
3 changed files with 341 additions and 18 deletions

View File

@@ -15,13 +15,22 @@
package azure
import (
"bytes"
"context"
"encoding/base64"
"encoding/binary"
"errors"
"fmt"
"io"
"math"
"strings"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/versity/versitygw/backend"
@@ -32,7 +41,9 @@ import (
type Azure struct {
backend.BackendUnsupported
client *azblob.Client
client *azblob.Client
creds *azblob.SharedKeyCredential
serviceURL string
}
var _ backend.Backend = &Azure{}
@@ -48,7 +59,7 @@ func New(accountName, accountKey, serviceURL string) (*Azure, error) {
return nil, fmt.Errorf("init client: %w", err)
}
return &Azure{client: client}, nil
return &Azure{client: client, serviceURL: serviceURL, creds: cred}, nil
}
func (az *Azure) Shutdown() {}
@@ -88,8 +99,19 @@ func (az *Azure) ListBuckets(ctx context.Context, owner string, isAdmin bool) (s
return result, nil
}
//func (az *Azure) HeadBucket(ctx context.Context, input *s3.HeadBucketInput) (*s3.HeadBucketOutput, error) {
//}
func (az *Azure) HeadBucket(ctx context.Context, input *s3.HeadBucketInput) (*s3.HeadBucketOutput, error) {
client, err := az.getContainerClient(*input.Bucket)
if err != nil {
return nil, err
}
_, err = client.GetProperties(ctx, nil)
if err != nil {
return nil, azureErrToS3Err(err)
}
return &s3.HeadBucketOutput{}, nil
}
func (az *Azure) DeleteBucket(ctx context.Context, input *s3.DeleteBucketInput) error {
_, err := az.client.DeleteContainer(ctx, *input.Bucket, nil)
@@ -126,7 +148,10 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput, writer
return nil, fmt.Errorf("copy data: %w", err)
}
tagcount := int32(*blobDownloadResponse.TagCount)
var tagcount int32
if blobDownloadResponse.TagCount != nil {
tagcount = int32(*blobDownloadResponse.TagCount)
}
return &s3.GetObjectOutput{
AcceptRanges: blobDownloadResponse.AcceptRanges,
@@ -141,6 +166,31 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput, writer
}, nil
}
func (az *Azure) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) {
client, err := az.getBlobClient(*input.Bucket, *input.Key)
if err != nil {
return nil, err
}
resp, err := client.GetProperties(ctx, nil)
if err != nil {
return nil, azureErrToS3Err(err)
}
return &s3.HeadObjectOutput{
AcceptRanges: resp.AcceptRanges,
ContentLength: resp.ContentLength,
ContentType: resp.ContentType,
ContentEncoding: resp.ContentEncoding,
ContentLanguage: resp.ContentLanguage,
ContentDisposition: resp.ContentDisposition,
ETag: (*string)(resp.ETag),
LastModified: resp.LastModified,
Metadata: parseAzMetadata(resp.Metadata),
Expires: resp.ExpiresOn,
}, nil
}
func (az *Azure) ListObjects(ctx context.Context, input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) {
pager := az.client.NewListBlobsFlatPager(*input.Bucket, &azblob.ListBlobsFlatOptions{
Marker: input.Marker,
@@ -259,6 +309,239 @@ func (az *Azure) DeleteObjects(ctx context.Context, input *s3.DeleteObjectsInput
}, nil
}
func (az *Azure) CopyObject(ctx context.Context, input *s3.CopyObjectInput) (*s3.CopyObjectOutput, error) {
client, err := az.getBlobClient(*input.Bucket, *input.Key)
if err != nil {
return nil, err
}
tags, err := parseTags(input.Tagging)
if err != nil {
return nil, err
}
resp, err := client.CopyFromURL(ctx, az.serviceURL+"/"+*input.CopySource, &blob.CopyFromURLOptions{
BlobTags: tags,
Metadata: parseMetadata(input.Metadata),
})
if err != nil {
return nil, azureErrToS3Err(err)
}
return &s3.CopyObjectOutput{
CopyObjectResult: &types.CopyObjectResult{
ETag: (*string)(resp.ETag),
LastModified: resp.LastModified,
},
}, nil
}
func (az *Azure) PutObjectTagging(ctx context.Context, bucket, object string, tags map[string]string) error {
client, err := az.getBlobClient(bucket, object)
if err != nil {
return err
}
_, err = client.SetTags(ctx, tags, nil)
if err != nil {
return azureErrToS3Err(err)
}
return nil
}
func (az *Azure) GetObjectTagging(ctx context.Context, bucket, object string) (map[string]string, error) {
client, err := az.getBlobClient(bucket, object)
if err != nil {
return nil, err
}
tags, err := client.GetTags(ctx, nil)
if err != nil {
return nil, azureErrToS3Err(err)
}
return parseAzTags(tags.BlobTagSet), nil
}
func (az *Azure) DeleteObjectTagging(ctx context.Context, bucket, object string) error {
client, err := az.getBlobClient(bucket, object)
if err != nil {
return err
}
//TODO: SDK has a bug here: it recommends to use the method to remove tags by passing an empty map,
// but the method panics because of incorrect implementation
_, err = client.SetTags(ctx, map[string]string{}, nil)
if err != nil {
return azureErrToS3Err(err)
}
return nil
}
func (az *Azure) UploadPart(ctx context.Context, input *s3.UploadPartInput) (etag string, err error) {
client, err := az.getBlockBlobClient(*input.Bucket, *input.Key)
if err != nil {
return "", err
}
rdr, err := getReadSeekCloser(input.Body)
if err != nil {
return "", err
}
etag = blockIDInt32ToBase64(*input.PartNumber)
_, err = client.StageBlock(ctx, etag, rdr, nil)
if err != nil {
return "", azureErrToS3Err(err)
}
return etag, nil
}
func (az *Azure) UploadPartCopy(ctx context.Context, input *s3.UploadPartCopyInput) (s3response.CopyObjectResult, error) {
client, err := az.getBlockBlobClient(*input.Bucket, *input.Key)
if err != nil {
return s3response.CopyObjectResult{}, nil
}
//TODO: handle block copy by range
//TODO: the action returns not implemented
_, err = client.StageBlockFromURL(ctx, *input.UploadId, *input.CopySource, nil)
if err != nil {
return s3response.CopyObjectResult{}, azureErrToS3Err(err)
}
return s3response.CopyObjectResult{}, nil
}
func (az *Azure) ListParts(ctx context.Context, input *s3.ListPartsInput) (s3response.ListPartsResult, error) {
client, err := az.getBlockBlobClient(*input.Bucket, *input.Key)
if err != nil {
return s3response.ListPartsResult{}, nil
}
resp, err := client.GetBlockList(ctx, blockblob.BlockListTypeUncommitted, nil)
if err != nil {
return s3response.ListPartsResult{}, azureErrToS3Err(err)
}
parts := []s3response.Part{}
for _, el := range resp.BlockList.UncommittedBlocks {
partNumber, err := decodeBlockId(*el.Name)
if err != nil {
return s3response.ListPartsResult{}, err
}
parts = append(parts, s3response.Part{
Size: *el.Size,
ETag: *el.Name,
PartNumber: partNumber,
})
}
return s3response.ListPartsResult{
Bucket: *input.Bucket,
Key: *input.Key,
Parts: parts,
}, nil
}
func (az *Azure) ListMultipartUploads(ctx context.Context, input *s3.ListMultipartUploadsInput) (s3response.ListMultipartUploadsResult, error) {
client, err := az.getContainerClient(*input.Bucket)
if err != nil {
return s3response.ListMultipartUploadsResult{}, nil
}
pager := client.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
Include: container.ListBlobsInclude{UncommittedBlobs: true},
Marker: input.KeyMarker,
Prefix: input.Prefix,
})
var maxUploads int32 = math.MaxInt32
if input.MaxUploads != nil {
maxUploads = *input.MaxUploads
}
isTruncated := false
nextKeyMarker := ""
uploads := []s3response.Upload{}
breakFlag := false
for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
return s3response.ListMultipartUploadsResult{}, azureErrToS3Err(err)
}
for _, el := range resp.Segment.BlobItems {
if el.Properties.AccessTier == nil {
if len(uploads) >= int(*input.MaxUploads) {
breakFlag = true
nextKeyMarker = *el.Name
isTruncated = true
break
}
uploads = append(uploads, s3response.Upload{
Key: *el.Name,
Initiated: el.Properties.CreationTime.Format(backend.RFC3339TimeFormat),
})
}
}
if breakFlag {
break
}
}
return s3response.ListMultipartUploadsResult{
Uploads: uploads,
Bucket: *input.Bucket,
KeyMarker: *input.KeyMarker,
NextKeyMarker: nextKeyMarker,
MaxUploads: int(maxUploads),
Prefix: *input.Prefix,
IsTruncated: isTruncated,
Delimiter: *input.Delimiter,
}, nil
}
func (az *Azure) AbortMultipartUpload(ctx context.Context, input *s3.AbortMultipartUploadInput) error {
_, err := az.client.DeleteBlob(ctx, *input.Bucket, *input.Key, nil)
if err != nil {
return azureErrToS3Err(err)
}
return nil
}
func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
client, err := az.getBlockBlobClient(*input.Bucket, *input.Key)
if err != nil {
return nil, err
}
blockIds := []string{}
for _, el := range input.MultipartUpload.Parts {
blockIds = append(blockIds, *el.ETag)
}
resp, err := client.CommitBlockList(ctx, blockIds, nil)
if err != nil {
return nil, azureErrToS3Err(err)
}
return &s3.CompleteMultipartUploadOutput{
Bucket: input.Bucket,
Key: input.Key,
ETag: (*string)(resp.ETag),
}, nil
}
func (az *Azure) getBlobClient(container, blb string) (*blob.Client, error) {
return blob.NewClientWithSharedKeyCredential(fmt.Sprintf("%v/%v/%v", az.serviceURL, container, blb), az.creds, nil)
}
func (az *Azure) getContainerClient(ctr string) (*container.Client, error) {
return container.NewClientWithSharedKeyCredential(fmt.Sprintf("%v/%v", az.serviceURL, ctr), az.creds, nil)
}
func (az *Azure) getBlockBlobClient(container, blob string) (*blockblob.Client, error) {
return blockblob.NewClientWithSharedKeyCredential(fmt.Sprintf("%v/%v/%v", az.serviceURL, container, blob), az.creds, nil)
}
func parseMetadata(m map[string]string) map[string]*string {
if m == nil {
return nil
@@ -303,6 +586,15 @@ func parseTags(tagstr *string) (map[string]string, error) {
return tags, nil
}
func parseAzTags(tagSet []*blob.Tags) map[string]string {
tags := map[string]string{}
for _, tag := range tagSet {
tags[*tag.Key] = *tag.Value
}
return tags
}
func getString(str *string) string {
if str == nil {
return ""
@@ -314,7 +606,42 @@ func getStringPtr(str string) *string {
return &str
}
func azureErrToS3Err(err error) error {
// TODO: map azure errors to s3 errors
return err
func azureErrToS3Err(apiErr error) error {
var azErr *azcore.ResponseError
if !errors.As(apiErr, &azErr) {
return apiErr
}
resp := s3err.APIError{
Code: azErr.ErrorCode,
Description: azErr.RawResponse.Status,
HTTPStatusCode: azErr.StatusCode,
}
fmt.Println(resp)
return resp
}
func getReadSeekCloser(input io.Reader) (io.ReadSeekCloser, error) {
var buffer bytes.Buffer
_, err := io.Copy(&buffer, input)
if err != nil {
return nil, err
}
return streaming.NopCloser(bytes.NewReader(buffer.Bytes())), nil
}
func blockIDInt32ToBase64(blockID int32) string {
binaryBlockID := &[4]byte{} // All block IDs are 4 bytes long
binary.LittleEndian.PutUint32(binaryBlockID[:], uint32(blockID))
return base64.StdEncoding.EncodeToString(binaryBlockID[:])
}
func decodeBlockId(blockID string) (int, error) {
slice, err := base64.StdEncoding.DecodeString(blockID)
if err != nil {
return 0, nil
}
return int(binary.LittleEndian.Uint32(slice)), nil
}

View File

@@ -36,13 +36,13 @@ func azureCommand() *cli.Command {
Name: "account",
Usage: "azure account name",
EnvVars: []string{"AZ_ACCESS_KEY"},
Aliases: []string{"s"},
Aliases: []string{"a"},
Destination: &azAccount,
},
&cli.StringFlag{
Name: "secret",
Usage: "azure secret key",
EnvVars: []string{"AZ_SECRET_KEY"},
Name: "account-key",
Usage: "azure account key",
EnvVars: []string{"AZ_ACCOUNT_KEY"},
Aliases: []string{"s"},
Destination: &azKey,
},
@@ -58,10 +58,6 @@ func azureCommand() *cli.Command {
}
func runAzure(ctx *cli.Context) error {
if ctx.NArg() == 0 {
return fmt.Errorf("no directory provided for operation")
}
if azServiceURL == "" {
// if not otherwise specified, use the typical form: http(s)://<account>.blob.core.windows.net/
azServiceURL = fmt.Sprintf("https://%s.blob.core.windows.net/", azAccount)
@@ -69,7 +65,7 @@ func runAzure(ctx *cli.Context) error {
be, err := azure.New(azAccount, azKey, azServiceURL)
if err != nil {
return fmt.Errorf("init posix: %v", err)
return fmt.Errorf("init azure: %v", err)
}
return runGateway(ctx.Context, be)

View File

@@ -73,7 +73,7 @@ type ListMultipartUploadsResult struct {
CommonPrefixes []CommonPrefix
}
// Upload desribes in progress multipart upload
// Upload describes in progress multipart upload
type Upload struct {
Key string
UploadID string `xml:"UploadId"`