Merge pull request #276 from versity/ben/s3

feat: s3proxy backend
This commit is contained in:
Ben McClelland
2023-10-11 09:55:55 -07:00
committed by GitHub
6 changed files with 679 additions and 8 deletions

View File

@@ -171,12 +171,16 @@ func CheckIfAccountsExist(accs []string, iam IAMService) ([]string, error) {
for _, acc := range accs {
_, err := iam.GetUserAccount(acc)
if err != nil && err != ErrNoSuchUser {
if err != nil {
if err == ErrNoSuchUser {
result = append(result, acc)
continue
}
if err == ErrNotSupported {
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
}
return nil, fmt.Errorf("check user account: %w", err)
}
if err == ErrNoSuchUser {
result = append(result, acc)
}
}
return result, nil
}

View File

@@ -14,26 +14,30 @@
package auth
import "fmt"
import (
"errors"
)
// IAMServiceSingle manages the single tenant (root-only) IAM service
type IAMServiceSingle struct{}
var _ IAMService = &IAMServiceSingle{}
var ErrNotSupported = errors.New("method is not supported")
// CreateAccount not valid in single tenant mode
func (IAMServiceSingle) CreateAccount(account Account) error {
return fmt.Errorf("create user not valid in single tenant mode")
return ErrNotSupported
}
// GetUserAccount no accounts in single tenant mode
func (IAMServiceSingle) GetUserAccount(access string) (Account, error) {
return Account{}, ErrNoSuchUser
return Account{}, ErrNotSupported
}
// DeleteUserAccount no accounts in single tenant mode
func (IAMServiceSingle) DeleteUserAccount(access string) error {
return ErrNoSuchUser
return ErrNotSupported
}
// ListUserAccounts no accounts in single tenant mode

86
backend/s3proxy/client.go Normal file
View File

@@ -0,0 +1,86 @@
// Copyright 2023 Versity Software
// This file is licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package s3proxy
import (
"context"
"crypto/tls"
"fmt"
"net/http"
"github.com/aws/aws-sdk-go-v2/aws"
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/smithy-go/middleware"
"github.com/versity/versitygw/auth"
)
func (s *S3be) getClientFromCtx(ctx context.Context) (*s3.Client, error) {
acct, ok := ctx.Value("account").(auth.Account)
if !ok {
return nil, fmt.Errorf("invalid account in context")
}
cfg, err := s.getConfig(ctx, acct.Access, acct.Secret)
if err != nil {
return nil, err
}
return s3.NewFromConfig(cfg), nil
}
func (s *S3be) getConfig(ctx context.Context, access, secret string) (aws.Config, error) {
creds := credentials.NewStaticCredentialsProvider(access, secret, "")
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: s.sslSkipVerify},
}
client := &http.Client{Transport: tr}
opts := []func(*config.LoadOptions) error{
config.WithRegion(s.awsRegion),
config.WithCredentialsProvider(creds),
config.WithHTTPClient(client),
}
if s.endpoint != "" {
opts = append(opts,
config.WithEndpointResolverWithOptions(s))
}
if s.disableChecksum {
opts = append(opts,
config.WithAPIOptions([]func(*middleware.Stack) error{v4.SwapComputePayloadSHA256ForUnsignedPayloadMiddleware}))
}
if s.debug {
opts = append(opts,
config.WithClientLogMode(aws.LogSigning|aws.LogRetries|aws.LogRequest|aws.LogResponse|aws.LogRequestEventMessage|aws.LogResponseEventMessage))
}
return config.LoadDefaultConfig(ctx, opts...)
}
// ResolveEndpoint is used for on prem or non-aws endpoints
func (s *S3be) ResolveEndpoint(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{
PartitionID: "aws",
URL: s.endpoint,
SigningRegion: s.awsRegion,
HostnameImmutable: true,
}, nil
}

500
backend/s3proxy/s3.go Normal file
View File

@@ -0,0 +1,500 @@
// Copyright 2023 Versity Software
// This file is licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package s3proxy
import (
"context"
"encoding/json"
"fmt"
"io"
"strconv"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/versity/versitygw/auth"
"github.com/versity/versitygw/backend"
"github.com/versity/versitygw/s3response"
)
type S3be struct {
backend.BackendUnsupported
endpoint string
awsRegion string
disableChecksum bool
sslSkipVerify bool
debug bool
}
func New(endpoint, region string, disableChecksum, sslSkipVerify, debug bool) *S3be {
return &S3be{
endpoint: endpoint,
awsRegion: region,
disableChecksum: disableChecksum,
sslSkipVerify: sslSkipVerify,
debug: debug,
}
}
func (s *S3be) ListBuckets(ctx context.Context, owner string, isAdmin bool) (s3response.ListAllMyBucketsResult, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return s3response.ListAllMyBucketsResult{}, err
}
output, err := client.ListBuckets(ctx, &s3.ListBucketsInput{})
if err != nil {
return s3response.ListAllMyBucketsResult{}, err
}
var buckets []s3response.ListAllMyBucketsEntry
for _, b := range output.Buckets {
buckets = append(buckets, s3response.ListAllMyBucketsEntry{
Name: *b.Name,
CreationDate: *b.CreationDate,
})
}
return s3response.ListAllMyBucketsResult{
Owner: s3response.CanonicalUser{
ID: *output.Owner.ID,
DisplayName: *output.Owner.DisplayName,
},
Buckets: s3response.ListAllMyBucketsList{
Bucket: buckets,
},
}, nil
}
func (s *S3be) HeadBucket(ctx context.Context, input *s3.HeadBucketInput) (*s3.HeadBucketOutput, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return nil, err
}
return client.HeadBucket(ctx, input)
}
func (s *S3be) CreateBucket(ctx context.Context, input *s3.CreateBucketInput) error {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return err
}
_, err = client.CreateBucket(ctx, input)
return err
}
func (s *S3be) DeleteBucket(ctx context.Context, input *s3.DeleteBucketInput) error {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return err
}
_, err = client.DeleteBucket(ctx, input)
return err
}
func (s *S3be) CreateMultipartUpload(ctx context.Context, input *s3.CreateMultipartUploadInput) (*s3.CreateMultipartUploadOutput, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return nil, err
}
return client.CreateMultipartUpload(ctx, input)
}
func (s *S3be) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return nil, err
}
return client.CompleteMultipartUpload(ctx, input)
}
func (s *S3be) AbortMultipartUpload(ctx context.Context, input *s3.AbortMultipartUploadInput) error {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return err
}
_, err = client.AbortMultipartUpload(ctx, input)
return err
}
const (
iso8601Format = "20060102T150405Z"
)
func (s *S3be) ListMultipartUploads(ctx context.Context, input *s3.ListMultipartUploadsInput) (s3response.ListMultipartUploadsResult, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return s3response.ListMultipartUploadsResult{}, err
}
output, err := client.ListMultipartUploads(ctx, input)
if err != nil {
return s3response.ListMultipartUploadsResult{}, err
}
var uploads []s3response.Upload
for _, u := range output.Uploads {
uploads = append(uploads, s3response.Upload{
Key: *u.Key,
UploadID: *u.UploadId,
Initiator: s3response.Initiator{
ID: *u.Initiator.ID,
DisplayName: *u.Initiator.DisplayName,
},
Owner: s3response.Owner{
ID: *u.Owner.ID,
DisplayName: *u.Owner.DisplayName,
},
StorageClass: string(u.StorageClass),
Initiated: u.Initiated.Format(iso8601Format),
})
}
var cps []s3response.CommonPrefix
for _, c := range output.CommonPrefixes {
cps = append(cps, s3response.CommonPrefix{
Prefix: *c.Prefix,
})
}
return s3response.ListMultipartUploadsResult{
Bucket: *output.Bucket,
KeyMarker: *output.KeyMarker,
UploadIDMarker: *output.UploadIdMarker,
NextKeyMarker: *output.NextKeyMarker,
NextUploadIDMarker: *output.NextUploadIdMarker,
Delimiter: *output.Delimiter,
Prefix: *output.Prefix,
EncodingType: string(output.EncodingType),
MaxUploads: int(output.MaxUploads),
IsTruncated: output.IsTruncated,
Uploads: uploads,
CommonPrefixes: cps,
}, nil
}
func (s *S3be) ListParts(ctx context.Context, input *s3.ListPartsInput) (s3response.ListPartsResult, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return s3response.ListPartsResult{}, err
}
output, err := client.ListParts(ctx, input)
if err != nil {
return s3response.ListPartsResult{}, err
}
var parts []s3response.Part
for _, p := range output.Parts {
parts = append(parts, s3response.Part{
PartNumber: int(p.PartNumber),
LastModified: p.LastModified.Format(iso8601Format),
ETag: *p.ETag,
Size: p.Size,
})
}
pnm, err := strconv.Atoi(*output.PartNumberMarker)
if err != nil {
return s3response.ListPartsResult{},
fmt.Errorf("parse part number marker: %w", err)
}
npmn, err := strconv.Atoi(*output.NextPartNumberMarker)
if err != nil {
return s3response.ListPartsResult{},
fmt.Errorf("parse next part number marker: %w", err)
}
return s3response.ListPartsResult{
Bucket: *output.Bucket,
Key: *output.Key,
UploadID: *output.UploadId,
Initiator: s3response.Initiator{
ID: *output.Initiator.ID,
DisplayName: *output.Initiator.DisplayName,
},
Owner: s3response.Owner{
ID: *output.Owner.ID,
DisplayName: *output.Owner.DisplayName,
},
StorageClass: string(output.StorageClass),
PartNumberMarker: pnm,
NextPartNumberMarker: npmn,
MaxParts: int(output.MaxParts),
IsTruncated: output.IsTruncated,
Parts: parts,
}, nil
}
func (s *S3be) UploadPart(ctx context.Context, input *s3.UploadPartInput) (etag string, err error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return "", err
}
output, err := client.UploadPart(ctx, input)
if err != nil {
return "", err
}
return *output.ETag, nil
}
func (s *S3be) UploadPartCopy(ctx context.Context, input *s3.UploadPartCopyInput) (s3response.CopyObjectResult, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return s3response.CopyObjectResult{}, err
}
output, err := client.UploadPartCopy(ctx, input)
if err != nil {
return s3response.CopyObjectResult{}, err
}
return s3response.CopyObjectResult{
LastModified: *output.CopyPartResult.LastModified,
ETag: *output.CopyPartResult.ETag,
}, nil
}
func (s *S3be) PutObject(ctx context.Context, input *s3.PutObjectInput) (string, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return "", err
}
output, err := client.PutObject(ctx, input)
if err != nil {
return "", err
}
return *output.ETag, nil
}
func (s *S3be) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return nil, err
}
return client.HeadObject(ctx, input)
}
func (s *S3be) GetObject(ctx context.Context, input *s3.GetObjectInput, w io.Writer) (*s3.GetObjectOutput, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return nil, err
}
output, err := client.GetObject(ctx, input)
if err != nil {
return nil, err
}
defer output.Body.Close()
_, err = io.Copy(w, output.Body)
if err != nil {
return nil, err
}
return output, nil
}
func (s *S3be) GetObjectAttributes(ctx context.Context, input *s3.GetObjectAttributesInput) (*s3.GetObjectAttributesOutput, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return nil, err
}
return client.GetObjectAttributes(ctx, input)
}
func (s *S3be) CopyObject(ctx context.Context, input *s3.CopyObjectInput) (*s3.CopyObjectOutput, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return nil, err
}
return client.CopyObject(ctx, input)
}
func (s *S3be) ListObjects(ctx context.Context, input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return nil, err
}
return client.ListObjects(ctx, input)
}
func (s *S3be) ListObjectsV2(ctx context.Context, input *s3.ListObjectsV2Input) (*s3.ListObjectsV2Output, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return nil, err
}
return client.ListObjectsV2(ctx, input)
}
func (s *S3be) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) error {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return err
}
_, err = client.DeleteObject(ctx, input)
return err
}
func (s *S3be) DeleteObjects(ctx context.Context, input *s3.DeleteObjectsInput) (s3response.DeleteObjectsResult, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return s3response.DeleteObjectsResult{}, err
}
output, err := client.DeleteObjects(ctx, input)
if err != nil {
return s3response.DeleteObjectsResult{}, err
}
return s3response.DeleteObjectsResult{
Deleted: output.Deleted,
Error: output.Errors,
}, nil
}
func (s *S3be) GetBucketAcl(ctx context.Context, input *s3.GetBucketAclInput) ([]byte, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return nil, err
}
output, err := client.GetBucketAcl(ctx, input)
if err != nil {
return nil, err
}
var acl auth.ACL
acl.Owner = *output.Owner.ID
for _, el := range output.Grants {
acl.Grantees = append(acl.Grantees, auth.Grantee{
Permission: el.Permission,
Access: *el.Grantee.ID,
})
}
return json.Marshal(acl)
}
func (s S3be) PutBucketAcl(ctx context.Context, bucket string, data []byte) error {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return err
}
acl, err := auth.ParseACL(data)
if err != nil {
return err
}
input := &s3.PutBucketAclInput{
Bucket: &bucket,
ACL: acl.ACL,
AccessControlPolicy: &types.AccessControlPolicy{
Owner: &types.Owner{
ID: &acl.Owner,
},
},
}
for _, el := range acl.Grantees {
input.AccessControlPolicy.Grants = append(input.AccessControlPolicy.Grants, types.Grant{
Permission: el.Permission,
Grantee: &types.Grantee{
ID: &el.Access,
Type: types.TypeCanonicalUser,
},
})
}
_, err = client.PutBucketAcl(ctx, input)
return err
}
func (s *S3be) PutObjectTagging(ctx context.Context, bucket, object string, tags map[string]string) error {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return err
}
tagging := &types.Tagging{
TagSet: []types.Tag{},
}
for key, val := range tags {
tagging.TagSet = append(tagging.TagSet, types.Tag{
Key: &key,
Value: &val,
})
}
_, err = client.PutObjectTagging(ctx, &s3.PutObjectTaggingInput{
Bucket: &bucket,
Key: &object,
Tagging: tagging,
})
return err
}
func (s *S3be) GetObjectTagging(ctx context.Context, bucket, object string) (map[string]string, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return nil, err
}
output, err := client.GetObjectTagging(ctx, &s3.GetObjectTaggingInput{
Bucket: &bucket,
Key: &object,
})
if err != nil {
return nil, err
}
tags := make(map[string]string)
for _, el := range output.TagSet {
tags[*el.Key] = *el.Value
}
return tags, nil
}
func (s *S3be) DeleteObjectTagging(ctx context.Context, bucket, object string) error {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return err
}
_, err = client.DeleteObjectTagging(ctx, &s3.DeleteObjectTaggingInput{
Bucket: &bucket,
Key: &object,
})
return err
}

View File

@@ -70,6 +70,7 @@ func main() {
app.Commands = []*cli.Command{
posixCommand(),
scoutfsCommand(),
s3Command(),
adminCommand(),
testCommand(),
}

76
cmd/versitygw/s3.go Normal file
View File

@@ -0,0 +1,76 @@
// Copyright 2023 Versity Software
// This file is licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package main
import (
"github.com/urfave/cli/v2"
"github.com/versity/versitygw/backend/s3proxy"
)
var (
s3proxyEndpoint string
s3proxyRegion string
s3proxyDisableChecksum bool
s3proxySslSkipVerify bool
s3proxyDebug bool
)
func s3Command() *cli.Command {
return &cli.Command{
Name: "s3",
Usage: "s3 storage backend",
Description: `This runs the gateway like an s3 proxy redirecting requests
to an s3 storage backend service.`,
Action: runS3,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "endpoint",
Usage: "s3 service endpoint, default AWS if not specified",
Value: "",
Destination: &s3proxyEndpoint,
},
&cli.StringFlag{
Name: "region",
Usage: "s3 service region, default 'us-east-1' if not specified",
Value: "us-east-1",
Destination: &s3proxyRegion,
},
&cli.BoolFlag{
Name: "disable-checksum",
Usage: "disable gateway to server object checksums",
Value: false,
Destination: &s3proxyDisableChecksum,
},
&cli.BoolFlag{
Name: "ssl-skip-verify",
Usage: "skip ssl cert verification for s3 service",
Value: false,
Destination: &s3proxySslSkipVerify,
},
&cli.BoolFlag{
Name: "debug",
Usage: "output extra debug tracing",
Value: false,
Destination: &s3proxyDebug,
},
},
}
}
func runS3(ctx *cli.Context) error {
be := s3proxy.New(s3proxyEndpoint, s3proxyRegion,
s3proxyDisableChecksum, s3proxySslSkipVerify, s3proxyDebug)
return runGateway(ctx, be)
}