feat: s3proxy backend

This backend redirects incoming requests to another s3 service.
This will use the incoming credentials to setup the client
requests to the external s3 service. So the IAM accounts (or
root account) must match what the external s3 service expects.
This commit is contained in:
Ben McClelland
2023-10-02 09:06:24 -07:00
parent 641841f9d5
commit f58646b58d
4 changed files with 542 additions and 0 deletions

86
backend/s3proxy/client.go Normal file
View File

@@ -0,0 +1,86 @@
// Copyright 2023 Versity Software
// This file is licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package s3proxy
import (
"context"
"crypto/tls"
"fmt"
"net/http"
"github.com/aws/aws-sdk-go-v2/aws"
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/smithy-go/middleware"
"github.com/versity/versitygw/auth"
)
func (s *S3be) getClientFromCtx(ctx context.Context) (*s3.Client, error) {
acct, ok := ctx.Value("account").(auth.Account)
if !ok {
return nil, fmt.Errorf("invalid account in context")
}
cfg, err := s.getConfig(ctx, acct.Access, acct.Secret)
if err != nil {
return nil, err
}
return s3.NewFromConfig(cfg), nil
}
func (s *S3be) getConfig(ctx context.Context, access, secret string) (aws.Config, error) {
creds := credentials.NewStaticCredentialsProvider(access, secret, "")
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: s.sslSkipVerify},
}
client := &http.Client{Transport: tr}
opts := []func(*config.LoadOptions) error{
config.WithRegion(s.awsRegion),
config.WithCredentialsProvider(creds),
config.WithHTTPClient(client),
}
if s.endpoint != "" {
opts = append(opts,
config.WithEndpointResolverWithOptions(s))
}
if s.disableChecksum {
opts = append(opts,
config.WithAPIOptions([]func(*middleware.Stack) error{v4.SwapComputePayloadSHA256ForUnsignedPayloadMiddleware}))
}
if s.debug {
opts = append(opts,
config.WithClientLogMode(aws.LogSigning|aws.LogRetries|aws.LogRequest|aws.LogResponse|aws.LogRequestEventMessage|aws.LogResponseEventMessage))
}
return config.LoadDefaultConfig(ctx, opts...)
}
// ResolveEndpoint is used for on prem or non-aws endpoints
func (s *S3be) ResolveEndpoint(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{
PartitionID: "aws",
URL: s.endpoint,
SigningRegion: s.awsRegion,
HostnameImmutable: true,
}, nil
}

379
backend/s3proxy/s3.go Normal file
View File

@@ -0,0 +1,379 @@
// Copyright 2023 Versity Software
// This file is licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package s3proxy
import (
"context"
"fmt"
"io"
"strconv"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/versity/versitygw/backend"
"github.com/versity/versitygw/s3response"
)
type S3be struct {
backend.BackendUnsupported
endpoint string
awsRegion string
disableChecksum bool
sslSkipVerify bool
debug bool
}
func New(endpoint, region string, disableChecksum, sslSkipVerify, debug bool) *S3be {
return &S3be{
endpoint: endpoint,
awsRegion: region,
disableChecksum: disableChecksum,
sslSkipVerify: sslSkipVerify,
debug: debug,
}
}
func (s *S3be) ListBuckets(ctx context.Context, owner string, isAdmin bool) (s3response.ListAllMyBucketsResult, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return s3response.ListAllMyBucketsResult{}, err
}
output, err := client.ListBuckets(ctx, &s3.ListBucketsInput{})
if err != nil {
return s3response.ListAllMyBucketsResult{}, err
}
var buckets []s3response.ListAllMyBucketsEntry
for _, b := range output.Buckets {
buckets = append(buckets, s3response.ListAllMyBucketsEntry{
Name: *b.Name,
CreationDate: *b.CreationDate,
})
}
return s3response.ListAllMyBucketsResult{
Owner: s3response.CanonicalUser{
ID: *output.Owner.ID,
DisplayName: *output.Owner.DisplayName,
},
Buckets: s3response.ListAllMyBucketsList{
Bucket: buckets,
},
}, nil
}
func (s *S3be) HeadBucket(ctx context.Context, input *s3.HeadBucketInput) (*s3.HeadBucketOutput, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return nil, err
}
return client.HeadBucket(ctx, input)
}
func (s *S3be) CreateBucket(ctx context.Context, input *s3.CreateBucketInput) error {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return err
}
_, err = client.CreateBucket(ctx, input)
return err
}
func (s *S3be) DeleteBucket(ctx context.Context, input *s3.DeleteBucketInput) error {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return err
}
_, err = client.DeleteBucket(ctx, input)
return err
}
func (s *S3be) CreateMultipartUpload(ctx context.Context, input *s3.CreateMultipartUploadInput) (*s3.CreateMultipartUploadOutput, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return nil, err
}
return client.CreateMultipartUpload(ctx, input)
}
func (s *S3be) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return nil, err
}
return client.CompleteMultipartUpload(ctx, input)
}
func (s *S3be) AbortMultipartUpload(ctx context.Context, input *s3.AbortMultipartUploadInput) error {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return err
}
_, err = client.AbortMultipartUpload(ctx, input)
return err
}
const (
iso8601Format = "20060102T150405Z"
)
func (s *S3be) ListMultipartUploads(ctx context.Context, input *s3.ListMultipartUploadsInput) (s3response.ListMultipartUploadsResult, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return s3response.ListMultipartUploadsResult{}, err
}
output, err := client.ListMultipartUploads(ctx, input)
if err != nil {
return s3response.ListMultipartUploadsResult{}, err
}
var uploads []s3response.Upload
for _, u := range output.Uploads {
uploads = append(uploads, s3response.Upload{
Key: *u.Key,
UploadID: *u.UploadId,
Initiator: s3response.Initiator{
ID: *u.Initiator.ID,
DisplayName: *u.Initiator.DisplayName,
},
Owner: s3response.Owner{
ID: *u.Owner.ID,
DisplayName: *u.Owner.DisplayName,
},
StorageClass: string(u.StorageClass),
Initiated: u.Initiated.Format(iso8601Format),
})
}
var cps []s3response.CommonPrefix
for _, c := range output.CommonPrefixes {
cps = append(cps, s3response.CommonPrefix{
Prefix: *c.Prefix,
})
}
return s3response.ListMultipartUploadsResult{
Bucket: *output.Bucket,
KeyMarker: *output.KeyMarker,
UploadIDMarker: *output.UploadIdMarker,
NextKeyMarker: *output.NextKeyMarker,
NextUploadIDMarker: *output.NextUploadIdMarker,
Delimiter: *output.Delimiter,
Prefix: *output.Prefix,
EncodingType: string(output.EncodingType),
MaxUploads: int(output.MaxUploads),
IsTruncated: output.IsTruncated,
Uploads: uploads,
CommonPrefixes: cps,
}, nil
}
func (s *S3be) ListParts(ctx context.Context, input *s3.ListPartsInput) (s3response.ListPartsResult, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return s3response.ListPartsResult{}, err
}
output, err := client.ListParts(ctx, input)
if err != nil {
return s3response.ListPartsResult{}, err
}
var parts []s3response.Part
for _, p := range output.Parts {
parts = append(parts, s3response.Part{
PartNumber: int(p.PartNumber),
LastModified: p.LastModified.Format(iso8601Format),
ETag: *p.ETag,
Size: p.Size,
})
}
pnm, err := strconv.Atoi(*output.PartNumberMarker)
if err != nil {
return s3response.ListPartsResult{},
fmt.Errorf("parse part number marker: %w", err)
}
npmn, err := strconv.Atoi(*output.NextPartNumberMarker)
if err != nil {
return s3response.ListPartsResult{},
fmt.Errorf("parse next part number marker: %w", err)
}
return s3response.ListPartsResult{
Bucket: *output.Bucket,
Key: *output.Key,
UploadID: *output.UploadId,
Initiator: s3response.Initiator{
ID: *output.Initiator.ID,
DisplayName: *output.Initiator.DisplayName,
},
Owner: s3response.Owner{
ID: *output.Owner.ID,
DisplayName: *output.Owner.DisplayName,
},
StorageClass: string(output.StorageClass),
PartNumberMarker: pnm,
NextPartNumberMarker: npmn,
MaxParts: int(output.MaxParts),
IsTruncated: output.IsTruncated,
Parts: parts,
}, nil
}
func (s *S3be) UploadPart(ctx context.Context, input *s3.UploadPartInput) (etag string, err error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return "", err
}
output, err := client.UploadPart(ctx, input)
if err != nil {
return "", err
}
return *output.ETag, nil
}
func (s *S3be) UploadPartCopy(ctx context.Context, input *s3.UploadPartCopyInput) (s3response.CopyObjectResult, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return s3response.CopyObjectResult{}, err
}
output, err := client.UploadPartCopy(ctx, input)
if err != nil {
return s3response.CopyObjectResult{}, err
}
return s3response.CopyObjectResult{
LastModified: *output.CopyPartResult.LastModified,
ETag: *output.CopyPartResult.ETag,
}, nil
}
func (s *S3be) PutObject(ctx context.Context, input *s3.PutObjectInput) (string, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return "", err
}
output, err := client.PutObject(ctx, input)
if err != nil {
return "", err
}
return *output.ETag, nil
}
func (s *S3be) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return nil, err
}
return client.HeadObject(ctx, input)
}
func (s *S3be) GetObject(ctx context.Context, input *s3.GetObjectInput, w io.Writer) (*s3.GetObjectOutput, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return nil, err
}
output, err := client.GetObject(ctx, input)
if err != nil {
return nil, err
}
defer output.Body.Close()
_, err = io.Copy(w, output.Body)
if err != nil {
return nil, err
}
return output, nil
}
func (s *S3be) GetObjectAttributes(ctx context.Context, input *s3.GetObjectAttributesInput) (*s3.GetObjectAttributesOutput, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return nil, err
}
return client.GetObjectAttributes(ctx, input)
}
func (s *S3be) CopyObject(ctx context.Context, input *s3.CopyObjectInput) (*s3.CopyObjectOutput, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return nil, err
}
return client.CopyObject(ctx, input)
}
func (s *S3be) ListObjects(ctx context.Context, input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return nil, err
}
return client.ListObjects(ctx, input)
}
func (s *S3be) ListObjectsV2(ctx context.Context, input *s3.ListObjectsV2Input) (*s3.ListObjectsV2Output, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return nil, err
}
return client.ListObjectsV2(ctx, input)
}
func (s *S3be) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) error {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return err
}
_, err = client.DeleteObject(ctx, input)
return err
}
func (s *S3be) DeleteObjects(ctx context.Context, input *s3.DeleteObjectsInput) (s3response.DeleteObjectsResult, error) {
client, err := s.getClientFromCtx(ctx)
if err != nil {
return s3response.DeleteObjectsResult{}, err
}
output, err := client.DeleteObjects(ctx, input)
if err != nil {
return s3response.DeleteObjectsResult{}, err
}
return s3response.DeleteObjectsResult{
Deleted: output.Deleted,
Error: output.Errors,
}, nil
}

View File

@@ -70,6 +70,7 @@ func main() {
app.Commands = []*cli.Command{
posixCommand(),
scoutfsCommand(),
s3Command(),
adminCommand(),
testCommand(),
}

76
cmd/versitygw/s3.go Normal file
View File

@@ -0,0 +1,76 @@
// Copyright 2023 Versity Software
// This file is licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package main
import (
"github.com/urfave/cli/v2"
"github.com/versity/versitygw/backend/s3proxy"
)
var (
s3proxyEndpoint string
s3proxyRegion string
s3proxyDisableChecksum bool
s3proxySslSkipVerify bool
s3proxyDebug bool
)
func s3Command() *cli.Command {
return &cli.Command{
Name: "s3",
Usage: "s3 storage backend",
Description: `This runs the gateway like an s3 proxy redirecting requests
to an s3 storage backend service.`,
Action: runS3,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "endpoint",
Usage: "s3 service endpoint, default AWS if not specified",
Value: "",
Destination: &s3proxyEndpoint,
},
&cli.StringFlag{
Name: "region",
Usage: "s3 service region, default 'us-east-1' if not specified",
Value: "us-east-1",
Destination: &s3proxyRegion,
},
&cli.BoolFlag{
Name: "disable-checksum",
Usage: "disable gateway to server object checksums",
Value: false,
Destination: &s3proxyDisableChecksum,
},
&cli.BoolFlag{
Name: "ssl-skip-verify",
Usage: "skip ssl cert verification for s3 service",
Value: false,
Destination: &s3proxySslSkipVerify,
},
&cli.BoolFlag{
Name: "debug",
Usage: "output extra debug tracing",
Value: false,
Destination: &s3proxyDebug,
},
},
}
}
func runS3(ctx *cli.Context) error {
be := s3proxy.New(s3proxyEndpoint, s3proxyRegion,
s3proxyDisableChecksum, s3proxySslSkipVerify, s3proxyDebug)
return runGateway(ctx, be)
}