Compare commits

...

4 Commits

Author SHA1 Message Date
Ben McClelland
7086579590 fix: list objects trim common prefixes that match marker prefix
This checks to see if the common prefix is before the marker and
thus would have been returned in earlier list objects request.

The error case was aws cli listing multiple entries for the same
common prefix when the listing required multiple pagination
requests.

Fixes #778
2024-09-16 14:55:55 -07:00
jonaustin09
dea5e0c0b2 feat: Implemented object versioning for multipart uploads. Implemented integration tests for the versioning implementation for multipart uploads 2024-09-16 14:51:22 -07:00
jonaustin09
16995acc17 feat: Added integration tests for bucket object versioning. Made a couple of bug fixes in the versioning implementation 2024-09-16 14:51:20 -07:00
jonaustin09
1f41f91f2d feat: basic logic implementation of bucket object versioning in posix backend
New posix backend option --versioning-dir will enable storing object versions
in specified directory.
2024-09-16 14:42:24 -07:00
22 changed files with 3167 additions and 243 deletions

View File

@@ -36,6 +36,7 @@ const (
ListBucketMultipartUploadsAction Action = "s3:ListBucketMultipartUploads"
PutObjectAction Action = "s3:PutObject"
GetObjectAction Action = "s3:GetObject"
GetObjectVersionAction Action = "s3:GetObjectVersion"
DeleteObjectAction Action = "s3:DeleteObject"
GetObjectAclAction Action = "s3:GetObjectAcl"
GetObjectAttributesAction Action = "s3:GetObjectAttributes"
@@ -75,6 +76,7 @@ var supportedActionList = map[Action]struct{}{
ListBucketMultipartUploadsAction: {},
PutObjectAction: {},
GetObjectAction: {},
GetObjectVersionAction: {},
DeleteObjectAction: {},
GetObjectAclAction: {},
GetObjectAttributesAction: {},
@@ -103,6 +105,7 @@ var supportedObjectActionList = map[Action]struct{}{
ListMultipartUploadPartsAction: {},
PutObjectAction: {},
GetObjectAction: {},
GetObjectVersionAction: {},
DeleteObjectAction: {},
GetObjectAclAction: {},
GetObjectAttributesAction: {},

View File

@@ -284,10 +284,10 @@ func (az *Azure) DeleteBucketOwnershipControls(ctx context.Context, bucket strin
return az.deleteContainerMetaData(ctx, bucket, string(keyOwnership))
}
func (az *Azure) PutObject(ctx context.Context, po *s3.PutObjectInput) (string, error) {
func (az *Azure) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3response.PutObjectOutput, error) {
tags, err := parseTags(po.Tagging)
if err != nil {
return "", err
return s3response.PutObjectOutput{}, err
}
opts := &blockblob.UploadStreamOptions{
@@ -312,14 +312,14 @@ func (az *Azure) PutObject(ctx context.Context, po *s3.PutObjectInput) (string,
uploadResp, err := az.client.UploadStream(ctx, *po.Bucket, *po.Key, po.Body, opts)
if err != nil {
return "", azureErrToS3Err(err)
return s3response.PutObjectOutput{}, azureErrToS3Err(err)
}
// Set object legal hold
if po.ObjectLockLegalHoldStatus == types.ObjectLockLegalHoldStatusOn {
err := az.PutObjectLegalHold(ctx, *po.Bucket, *po.Key, "", true)
if err != nil {
return "", err
return s3response.PutObjectOutput{}, err
}
}
@@ -331,15 +331,17 @@ func (az *Azure) PutObject(ctx context.Context, po *s3.PutObjectInput) (string,
}
retParsed, err := json.Marshal(retention)
if err != nil {
return "", fmt.Errorf("parse object lock retention: %w", err)
return s3response.PutObjectOutput{}, fmt.Errorf("parse object lock retention: %w", err)
}
err = az.PutObjectRetention(ctx, *po.Bucket, *po.Key, "", true, retParsed)
if err != nil {
return "", err
return s3response.PutObjectOutput{}, err
}
}
return string(*uploadResp.ETag), nil
return s3response.PutObjectOutput{
ETag: string(*uploadResp.ETag),
}, nil
}
func (az *Azure) PutBucketTagging(ctx context.Context, bucket string, tags map[string]string) error {
@@ -663,22 +665,22 @@ Pager:
}, nil
}
func (az *Azure) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) error {
func (az *Azure) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) {
_, err := az.client.DeleteBlob(ctx, *input.Bucket, *input.Key, nil)
if err != nil {
azerr, ok := err.(*azcore.ResponseError)
if ok && azerr.StatusCode == 404 {
// if the object does not exist, S3 returns success
return nil
return &s3.DeleteObjectOutput{}, nil
}
}
return azureErrToS3Err(err)
return &s3.DeleteObjectOutput{}, azureErrToS3Err(err)
}
func (az *Azure) DeleteObjects(ctx context.Context, input *s3.DeleteObjectsInput) (s3response.DeleteResult, error) {
delResult, errs := []types.DeletedObject{}, []types.Error{}
for _, obj := range input.Delete.Objects {
err := az.DeleteObject(ctx, &s3.DeleteObjectInput{
_, err := az.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: input.Bucket,
Key: obj.Key,
})

View File

@@ -38,7 +38,7 @@ type Backend interface {
CreateBucket(_ context.Context, _ *s3.CreateBucketInput, defaultACL []byte) error
PutBucketAcl(_ context.Context, bucket string, data []byte) error
DeleteBucket(context.Context, *s3.DeleteBucketInput) error
PutBucketVersioning(context.Context, *s3.PutBucketVersioningInput) error
PutBucketVersioning(_ context.Context, bucket string, status types.BucketVersioningStatus) error
GetBucketVersioning(_ context.Context, bucket string) (*s3.GetBucketVersioningOutput, error)
PutBucketPolicy(_ context.Context, bucket string, policy []byte) error
GetBucketPolicy(_ context.Context, bucket string) ([]byte, error)
@@ -57,7 +57,7 @@ type Backend interface {
UploadPartCopy(context.Context, *s3.UploadPartCopyInput) (s3response.CopyObjectResult, error)
// standard object operations
PutObject(context.Context, *s3.PutObjectInput) (string, error)
PutObject(context.Context, *s3.PutObjectInput) (s3response.PutObjectOutput, error)
HeadObject(context.Context, *s3.HeadObjectInput) (*s3.HeadObjectOutput, error)
GetObject(context.Context, *s3.GetObjectInput) (*s3.GetObjectOutput, error)
GetObjectAcl(context.Context, *s3.GetObjectAclInput) (*s3.GetObjectAclOutput, error)
@@ -65,10 +65,10 @@ type Backend interface {
CopyObject(context.Context, *s3.CopyObjectInput) (*s3.CopyObjectOutput, error)
ListObjects(context.Context, *s3.ListObjectsInput) (s3response.ListObjectsResult, error)
ListObjectsV2(context.Context, *s3.ListObjectsV2Input) (s3response.ListObjectsV2Result, error)
DeleteObject(context.Context, *s3.DeleteObjectInput) error
DeleteObject(context.Context, *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error)
DeleteObjects(context.Context, *s3.DeleteObjectsInput) (s3response.DeleteResult, error)
PutObjectAcl(context.Context, *s3.PutObjectAclInput) error
ListObjectVersions(context.Context, *s3.ListObjectVersionsInput) (*s3.ListObjectVersionsOutput, error)
ListObjectVersions(context.Context, *s3.ListObjectVersionsInput) (s3response.ListVersionsResult, error)
// special case object operations
RestoreObject(context.Context, *s3.RestoreObjectInput) error
@@ -126,7 +126,7 @@ func (BackendUnsupported) PutBucketAcl(_ context.Context, bucket string, data []
func (BackendUnsupported) DeleteBucket(context.Context, *s3.DeleteBucketInput) error {
return s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (BackendUnsupported) PutBucketVersioning(context.Context, *s3.PutBucketVersioningInput) error {
func (BackendUnsupported) PutBucketVersioning(_ context.Context, bucket string, status types.BucketVersioningStatus) error {
return s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (BackendUnsupported) GetBucketVersioning(_ context.Context, bucket string) (*s3.GetBucketVersioningOutput, error) {
@@ -173,8 +173,8 @@ func (BackendUnsupported) UploadPartCopy(context.Context, *s3.UploadPartCopyInpu
return s3response.CopyObjectResult{}, s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (BackendUnsupported) PutObject(context.Context, *s3.PutObjectInput) (string, error) {
return "", s3err.GetAPIError(s3err.ErrNotImplemented)
func (BackendUnsupported) PutObject(context.Context, *s3.PutObjectInput) (s3response.PutObjectOutput, error) {
return s3response.PutObjectOutput{}, s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (BackendUnsupported) HeadObject(context.Context, *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) {
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
@@ -197,8 +197,8 @@ func (BackendUnsupported) ListObjects(context.Context, *s3.ListObjectsInput) (s3
func (BackendUnsupported) ListObjectsV2(context.Context, *s3.ListObjectsV2Input) (s3response.ListObjectsV2Result, error) {
return s3response.ListObjectsV2Result{}, s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (BackendUnsupported) DeleteObject(context.Context, *s3.DeleteObjectInput) error {
return s3err.GetAPIError(s3err.ErrNotImplemented)
func (BackendUnsupported) DeleteObject(context.Context, *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) {
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (BackendUnsupported) DeleteObjects(context.Context, *s3.DeleteObjectsInput) (s3response.DeleteResult, error) {
return s3response.DeleteResult{}, s3err.GetAPIError(s3err.ErrNotImplemented)
@@ -225,8 +225,8 @@ func (BackendUnsupported) SelectObjectContent(ctx context.Context, input *s3.Sel
}
}
func (BackendUnsupported) ListObjectVersions(context.Context, *s3.ListObjectVersionsInput) (*s3.ListObjectVersionsOutput, error) {
return nil, s3err.GetAPIError(s3err.ErrNotImplemented)
func (BackendUnsupported) ListObjectVersions(context.Context, *s3.ListObjectVersionsInput) (s3response.ListVersionsResult, error) {
return s3response.ListVersionsResult{}, s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (BackendUnsupported) GetBucketTagging(_ context.Context, bucket string) (map[string]string, error) {

View File

@@ -102,6 +102,32 @@ func ParseRange(size int64, acceptRange string) (int64, int64, error) {
return startOffset, endOffset - startOffset + 1, nil
}
// ParseCopySource parses x-amz-copy-source header and returns source bucket,
// source object, versionId, error respectively
func ParseCopySource(copySourceHeader string) (string, string, string, error) {
if copySourceHeader[0] == '/' {
copySourceHeader = copySourceHeader[1:]
}
cSplitted := strings.Split(copySourceHeader, "?")
copySource := cSplitted[0]
var versionId string
if len(cSplitted) > 1 {
versionIdParts := strings.Split(cSplitted[1], "=")
if len(versionIdParts) != 2 || versionIdParts[0] != "versionId" {
return "", "", "", s3err.GetAPIError(s3err.ErrInvalidRequest)
}
versionId = versionIdParts[1]
}
srcBucket, srcObject, ok := strings.Cut(copySource, "/")
if !ok {
return "", "", "", s3err.GetAPIError(s3err.ErrInvalidCopySource)
}
return srcBucket, srcObject, versionId, nil
}
func CreateExceedingRangeErr(objSize int64) s3err.APIError {
return s3err.APIError{
Code: "InvalidArgument",

File diff suppressed because it is too large Load Diff

View File

@@ -161,6 +161,48 @@ func (s *S3Proxy) DeleteBucketOwnershipControls(ctx context.Context, bucket stri
return handleError(err)
}
func (s *S3Proxy) PutBucketVersioning(ctx context.Context, bucket string, status types.BucketVersioningStatus) error {
_, err := s.client.PutBucketVersioning(ctx, &s3.PutBucketVersioningInput{
Bucket: &bucket,
VersioningConfiguration: &types.VersioningConfiguration{
Status: status,
},
})
return handleError(err)
}
func (s *S3Proxy) GetBucketVersioning(ctx context.Context, bucket string) (*s3.GetBucketVersioningOutput, error) {
out, err := s.client.GetBucketVersioning(ctx, &s3.GetBucketVersioningInput{
Bucket: &bucket,
})
return out, handleError(err)
}
func (s *S3Proxy) ListObjectVersions(ctx context.Context, input *s3.ListObjectVersionsInput) (s3response.ListVersionsResult, error) {
out, err := s.client.ListObjectVersions(ctx, input)
if err != nil {
return s3response.ListVersionsResult{}, handleError(err)
}
return s3response.ListVersionsResult{
CommonPrefixes: out.CommonPrefixes,
DeleteMarkers: out.DeleteMarkers,
Delimiter: out.Delimiter,
EncodingType: out.EncodingType,
IsTruncated: out.IsTruncated,
KeyMarker: out.KeyMarker,
MaxKeys: out.MaxKeys,
Name: out.Name,
NextKeyMarker: out.NextKeyMarker,
NextVersionIdMarker: out.NextVersionIdMarker,
Prefix: out.Prefix,
VersionIdMarker: input.VersionIdMarker,
Versions: out.Versions,
}, nil
}
func (s *S3Proxy) CreateMultipartUpload(ctx context.Context, input *s3.CreateMultipartUploadInput) (s3response.InitiateMultipartUploadResult, error) {
out, err := s.client.CreateMultipartUpload(ctx, input)
if err != nil {
@@ -304,17 +346,25 @@ func (s *S3Proxy) UploadPartCopy(ctx context.Context, input *s3.UploadPartCopyIn
}, nil
}
func (s *S3Proxy) PutObject(ctx context.Context, input *s3.PutObjectInput) (string, error) {
func (s *S3Proxy) PutObject(ctx context.Context, input *s3.PutObjectInput) (s3response.PutObjectOutput, error) {
// streaming backend is not seekable,
// use unsigned payload for streaming ops
output, err := s.client.PutObject(ctx, input, s3.WithAPIOptions(
v4.SwapComputePayloadSHA256ForUnsignedPayloadMiddleware,
))
if err != nil {
return "", handleError(err)
return s3response.PutObjectOutput{}, handleError(err)
}
return *output.ETag, nil
var versionID string
if output.VersionId != nil {
versionID = *output.VersionId
}
return s3response.PutObjectOutput{
ETag: *output.ETag,
VersionID: versionID,
}, nil
}
func (s *S3Proxy) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) {
@@ -416,9 +466,9 @@ func (s *S3Proxy) ListObjectsV2(ctx context.Context, input *s3.ListObjectsV2Inpu
}, nil
}
func (s *S3Proxy) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) error {
_, err := s.client.DeleteObject(ctx, input)
return handleError(err)
func (s *S3Proxy) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) {
res, err := s.client.DeleteObject(ctx, input)
return res, handleError(err)
}
func (s *S3Proxy) DeleteObjects(ctx context.Context, input *s3.DeleteObjectsInput) (s3response.DeleteResult, error) {

View File

@@ -191,12 +191,19 @@ func Walk(ctx context.Context, fileSystem fs.FS, prefix, delimiter, marker strin
// Common prefixes are a set, so should not have duplicates.
// These are abstractly a "directory", so need to include the
// delimiter at the end.
// delimiter at the end when we add to the map.
cprefNoDelim := prefix + before
cpref := prefix + before + delimiter
if cpref == marker {
pastMarker = true
return nil
}
if marker != "" && strings.HasPrefix(marker, cprefNoDelim) {
// skip common prefixes that are before the marker
return nil
}
cpmap[cpref] = struct{}{}
if (len(objects) + len(cpmap)) == int(max) {
newMarker = cpref
@@ -239,3 +246,205 @@ func contains(a string, strs []string) bool {
}
return false
}
type WalkVersioningResults struct {
CommonPrefixes []types.CommonPrefix
ObjectVersions []types.ObjectVersion
DelMarkers []types.DeleteMarkerEntry
Truncated bool
NextMarker string
NextVersionIdMarker string
}
type ObjVersionFuncResult struct {
ObjectVersions []types.ObjectVersion
DelMarkers []types.DeleteMarkerEntry
NextVersionIdMarker string
Truncated bool
}
type GetVersionsFunc func(path, versionIdMarker string, pastVersionIdMarker *bool, availableObjCount int, d fs.DirEntry) (*ObjVersionFuncResult, error)
// WalkVersions walks the supplied fs.FS and returns results compatible with
// ListObjectVersions action response
func WalkVersions(ctx context.Context, fileSystem fs.FS, prefix, delimiter, keyMarker, versionIdMarker string, max int, getObj GetVersionsFunc, skipdirs []string) (WalkVersioningResults, error) {
cpmap := make(map[string]struct{})
var objects []types.ObjectVersion
var delMarkers []types.DeleteMarkerEntry
var pastMarker bool
if keyMarker == "" {
pastMarker = true
}
var nextMarker string
var nextVersionIdMarker string
var truncated bool
pastVersionIdMarker := versionIdMarker == ""
err := fs.WalkDir(fileSystem, ".", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if ctx.Err() != nil {
return ctx.Err()
}
// Ignore the root directory
if path == "." {
return nil
}
if contains(d.Name(), skipdirs) {
return fs.SkipDir
}
if !pastMarker {
if path == keyMarker {
pastMarker = true
}
if path < keyMarker {
return nil
}
}
if d.IsDir() {
// If prefix is defined and the directory does not match prefix,
// do not descend into the directory because nothing will
// match this prefix. Make sure to append the / at the end of
// directories since this is implied as a directory path name.
// If path is a prefix of prefix, then path could still be
// building to match. So only skip if path isn't a prefix of prefix
// and prefix isn't a prefix of path.
if prefix != "" &&
!strings.HasPrefix(path+string(os.PathSeparator), prefix) &&
!strings.HasPrefix(prefix, path+string(os.PathSeparator)) {
return fs.SkipDir
}
res, err := getObj(path, versionIdMarker, &pastVersionIdMarker, max-len(objects)-len(delMarkers)-len(cpmap), d)
if err == ErrSkipObj {
return nil
}
if err != nil {
return fmt.Errorf("directory to object %q: %w", path, err)
}
objects = append(objects, res.ObjectVersions...)
delMarkers = append(delMarkers, res.DelMarkers...)
if res.Truncated {
truncated = true
nextMarker = path
nextVersionIdMarker = res.NextVersionIdMarker
return fs.SkipAll
}
return nil
}
// If object doesn't have prefix, don't include in results.
if prefix != "" && !strings.HasPrefix(path, prefix) {
return nil
}
if delimiter == "" {
// If no delimiter specified, then all files with matching
// prefix are included in results
res, err := getObj(path, versionIdMarker, &pastVersionIdMarker, max-len(objects)-len(delMarkers)-len(cpmap), d)
if err == ErrSkipObj {
return nil
}
if err != nil {
return fmt.Errorf("file to object %q: %w", path, err)
}
objects = append(objects, res.ObjectVersions...)
delMarkers = append(delMarkers, res.DelMarkers...)
if res.Truncated {
truncated = true
nextMarker = path
nextVersionIdMarker = res.NextVersionIdMarker
return fs.SkipAll
}
return nil
}
// Since delimiter is specified, we only want results that
// do not contain the delimiter beyond the prefix. If the
// delimiter exists past the prefix, then the substring
// between the prefix and delimiter is part of common prefixes.
//
// For example:
// prefix = A/
// delimiter = /
// and objects:
// A/file
// A/B/file
// B/C
// would return:
// objects: A/file
// common prefix: A/B/
//
// Note: No objects are included past the common prefix since
// these are all rolled up into the common prefix.
// Note: The delimiter can be anything, so we have to operate on
// the full path without any assumptions on posix directory hierarchy
// here. Usually the delimiter will be "/", but thats not required.
suffix := strings.TrimPrefix(path, prefix)
before, _, found := strings.Cut(suffix, delimiter)
if !found {
res, err := getObj(path, versionIdMarker, &pastVersionIdMarker, max-len(objects)-len(delMarkers)-len(cpmap), d)
if err == ErrSkipObj {
return nil
}
if err != nil {
return fmt.Errorf("file to object %q: %w", path, err)
}
objects = append(objects, res.ObjectVersions...)
delMarkers = append(delMarkers, res.DelMarkers...)
if res.Truncated {
truncated = true
nextMarker = path
nextVersionIdMarker = res.NextVersionIdMarker
return fs.SkipAll
}
return nil
}
// Common prefixes are a set, so should not have duplicates.
// These are abstractly a "directory", so need to include the
// delimiter at the end.
cpmap[prefix+before+delimiter] = struct{}{}
if (len(objects) + len(cpmap)) == int(max) {
nextMarker = path
truncated = true
return fs.SkipAll
}
return nil
})
if err != nil {
return WalkVersioningResults{}, err
}
var commonPrefixStrings []string
for k := range cpmap {
commonPrefixStrings = append(commonPrefixStrings, k)
}
sort.Strings(commonPrefixStrings)
commonPrefixes := make([]types.CommonPrefix, 0, len(commonPrefixStrings))
for _, cp := range commonPrefixStrings {
pfx := cp
commonPrefixes = append(commonPrefixes, types.CommonPrefix{
Prefix: &pfx,
})
}
return WalkVersioningResults{
CommonPrefixes: commonPrefixes,
ObjectVersions: objects,
DelMarkers: delMarkers,
Truncated: truncated,
NextMarker: nextMarker,
NextVersionIdMarker: nextVersionIdMarker,
}, nil
}

View File

@@ -25,6 +25,7 @@ import (
var (
chownuid, chowngid bool
bucketlinks bool
versioningDir string
)
func posixCommand() *cli.Command {
@@ -61,6 +62,12 @@ will be translated into the file /mnt/fs/gwroot/mybucket/a/b/c/myobject`,
EnvVars: []string{"VGW_BUCKET_LINKS"},
Destination: &bucketlinks,
},
&cli.StringFlag{
Name: "versioning-dir",
Usage: "the directory path to enable bucket versioning",
EnvVars: []string{"VGW_VERSIONING_DIR"},
Destination: &versioningDir,
},
},
}
}
@@ -77,9 +84,10 @@ func runPosix(ctx *cli.Context) error {
}
be, err := posix.New(gwroot, meta.XattrMeta{}, posix.PosixOpts{
ChownUID: chownuid,
ChownGID: chowngid,
BucketLinks: bucketlinks,
ChownUID: chownuid,
ChownGID: chowngid,
BucketLinks: bucketlinks,
VersioningDir: versioningDir,
})
if err != nil {
return fmt.Errorf("init posix: %v", err)

View File

@@ -22,20 +22,21 @@ import (
)
var (
awsID string
awsSecret string
endpoint string
prefix string
dstBucket string
partSize int64
objSize int64
concurrency int
files int
totalReqs int
upload bool
download bool
pathStyle bool
checksumDisable bool
awsID string
awsSecret string
endpoint string
prefix string
dstBucket string
partSize int64
objSize int64
concurrency int
files int
totalReqs int
upload bool
download bool
pathStyle bool
checksumDisable bool
versioningEnabled bool
)
func testCommand() *cli.Command {
@@ -87,6 +88,14 @@ func initTestCommands() []*cli.Command {
Usage: "Tests the full flow of gateway.",
Description: `Runs all the available tests to test the full flow of the gateway.`,
Action: getAction(integration.TestFullFlow),
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "versioning-enabled",
Usage: "Test the bucket object versioning, if the versioning is enabled",
Destination: &versioningEnabled,
Aliases: []string{"vs"},
},
},
},
{
Name: "posix",
@@ -276,6 +285,9 @@ func getAction(tf testFunc) func(*cli.Context) error {
if debug {
opts = append(opts, integration.WithDebug())
}
if versioningEnabled {
opts = append(opts, integration.WithVersioningEnabled())
}
s := integration.NewS3Conf(opts...)
tf(s)

1
go.mod
View File

@@ -16,6 +16,7 @@ require (
github.com/google/uuid v1.6.0
github.com/hashicorp/vault-client-go v0.4.3
github.com/nats-io/nats.go v1.37.0
github.com/oklog/ulid/v2 v2.1.0
github.com/pkg/xattr v0.4.10
github.com/segmentio/kafka-go v0.4.47
github.com/smira/go-statsd v1.3.3

3
go.sum
View File

@@ -129,6 +129,9 @@ github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=

View File

@@ -5,9 +5,11 @@ rm -rf /tmp/gw
mkdir /tmp/gw
rm -rf /tmp/covdata
mkdir /tmp/covdata
rm -rf /tmp/versioningdir
mkdir /tmp/versioningdir
# run server in background
GOCOVERDIR=/tmp/covdata ./versitygw -a user -s pass --iam-dir /tmp/gw posix /tmp/gw &
GOCOVERDIR=/tmp/covdata ./versitygw -a user -s pass --iam-dir /tmp/gw posix --versioning-dir /tmp/versioningdir /tmp/gw &
GW_PID=$!
# wait a second for server to start up
@@ -21,7 +23,7 @@ fi
# run tests
# full flow tests
if ! ./versitygw test -a user -s pass -e http://127.0.0.1:7070 full-flow; then
if ! ./versitygw test -a user -s pass -e http://127.0.0.1:7070 full-flow -vs; then
echo "full flow tests failed"
kill $GW_PID
exit 1

View File

@@ -53,7 +53,7 @@ var _ backend.Backend = &BackendMock{}
// DeleteBucketTaggingFunc: func(contextMoqParam context.Context, bucket string) error {
// panic("mock out the DeleteBucketTagging method")
// },
// DeleteObjectFunc: func(contextMoqParam context.Context, deleteObjectInput *s3.DeleteObjectInput) error {
// DeleteObjectFunc: func(contextMoqParam context.Context, deleteObjectInput *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) {
// panic("mock out the DeleteObject method")
// },
// DeleteObjectTaggingFunc: func(contextMoqParam context.Context, bucket string, object string) error {
@@ -113,7 +113,7 @@ var _ backend.Backend = &BackendMock{}
// ListMultipartUploadsFunc: func(contextMoqParam context.Context, listMultipartUploadsInput *s3.ListMultipartUploadsInput) (s3response.ListMultipartUploadsResult, error) {
// panic("mock out the ListMultipartUploads method")
// },
// ListObjectVersionsFunc: func(contextMoqParam context.Context, listObjectVersionsInput *s3.ListObjectVersionsInput) (*s3.ListObjectVersionsOutput, error) {
// ListObjectVersionsFunc: func(contextMoqParam context.Context, listObjectVersionsInput *s3.ListObjectVersionsInput) (s3response.ListVersionsResult, error) {
// panic("mock out the ListObjectVersions method")
// },
// ListObjectsFunc: func(contextMoqParam context.Context, listObjectsInput *s3.ListObjectsInput) (s3response.ListObjectsResult, error) {
@@ -137,10 +137,10 @@ var _ backend.Backend = &BackendMock{}
// PutBucketTaggingFunc: func(contextMoqParam context.Context, bucket string, tags map[string]string) error {
// panic("mock out the PutBucketTagging method")
// },
// PutBucketVersioningFunc: func(contextMoqParam context.Context, putBucketVersioningInput *s3.PutBucketVersioningInput) error {
// PutBucketVersioningFunc: func(contextMoqParam context.Context, bucket string, status types.BucketVersioningStatus) error {
// panic("mock out the PutBucketVersioning method")
// },
// PutObjectFunc: func(contextMoqParam context.Context, putObjectInput *s3.PutObjectInput) (string, error) {
// PutObjectFunc: func(contextMoqParam context.Context, putObjectInput *s3.PutObjectInput) (s3response.PutObjectOutput, error) {
// panic("mock out the PutObject method")
// },
// PutObjectAclFunc: func(contextMoqParam context.Context, putObjectAclInput *s3.PutObjectAclInput) error {
@@ -214,7 +214,7 @@ type BackendMock struct {
DeleteBucketTaggingFunc func(contextMoqParam context.Context, bucket string) error
// DeleteObjectFunc mocks the DeleteObject method.
DeleteObjectFunc func(contextMoqParam context.Context, deleteObjectInput *s3.DeleteObjectInput) error
DeleteObjectFunc func(contextMoqParam context.Context, deleteObjectInput *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error)
// DeleteObjectTaggingFunc mocks the DeleteObjectTagging method.
DeleteObjectTaggingFunc func(contextMoqParam context.Context, bucket string, object string) error
@@ -274,7 +274,7 @@ type BackendMock struct {
ListMultipartUploadsFunc func(contextMoqParam context.Context, listMultipartUploadsInput *s3.ListMultipartUploadsInput) (s3response.ListMultipartUploadsResult, error)
// ListObjectVersionsFunc mocks the ListObjectVersions method.
ListObjectVersionsFunc func(contextMoqParam context.Context, listObjectVersionsInput *s3.ListObjectVersionsInput) (*s3.ListObjectVersionsOutput, error)
ListObjectVersionsFunc func(contextMoqParam context.Context, listObjectVersionsInput *s3.ListObjectVersionsInput) (s3response.ListVersionsResult, error)
// ListObjectsFunc mocks the ListObjects method.
ListObjectsFunc func(contextMoqParam context.Context, listObjectsInput *s3.ListObjectsInput) (s3response.ListObjectsResult, error)
@@ -298,10 +298,10 @@ type BackendMock struct {
PutBucketTaggingFunc func(contextMoqParam context.Context, bucket string, tags map[string]string) error
// PutBucketVersioningFunc mocks the PutBucketVersioning method.
PutBucketVersioningFunc func(contextMoqParam context.Context, putBucketVersioningInput *s3.PutBucketVersioningInput) error
PutBucketVersioningFunc func(contextMoqParam context.Context, bucket string, status types.BucketVersioningStatus) error
// PutObjectFunc mocks the PutObject method.
PutObjectFunc func(contextMoqParam context.Context, putObjectInput *s3.PutObjectInput) (string, error)
PutObjectFunc func(contextMoqParam context.Context, putObjectInput *s3.PutObjectInput) (s3response.PutObjectOutput, error)
// PutObjectAclFunc mocks the PutObjectAcl method.
PutObjectAclFunc func(contextMoqParam context.Context, putObjectAclInput *s3.PutObjectAclInput) error
@@ -632,8 +632,10 @@ type BackendMock struct {
PutBucketVersioning []struct {
// ContextMoqParam is the contextMoqParam argument value.
ContextMoqParam context.Context
// PutBucketVersioningInput is the putBucketVersioningInput argument value.
PutBucketVersioningInput *s3.PutBucketVersioningInput
// Bucket is the bucket argument value.
Bucket string
// Status is the status argument value.
Status types.BucketVersioningStatus
}
// PutObject holds details about calls to the PutObject method.
PutObject []struct {
@@ -1154,7 +1156,7 @@ func (mock *BackendMock) DeleteBucketTaggingCalls() []struct {
}
// DeleteObject calls DeleteObjectFunc.
func (mock *BackendMock) DeleteObject(contextMoqParam context.Context, deleteObjectInput *s3.DeleteObjectInput) error {
func (mock *BackendMock) DeleteObject(contextMoqParam context.Context, deleteObjectInput *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) {
if mock.DeleteObjectFunc == nil {
panic("BackendMock.DeleteObjectFunc: method is nil but Backend.DeleteObject was just called")
}
@@ -1898,7 +1900,7 @@ func (mock *BackendMock) ListMultipartUploadsCalls() []struct {
}
// ListObjectVersions calls ListObjectVersionsFunc.
func (mock *BackendMock) ListObjectVersions(contextMoqParam context.Context, listObjectVersionsInput *s3.ListObjectVersionsInput) (*s3.ListObjectVersionsOutput, error) {
func (mock *BackendMock) ListObjectVersions(contextMoqParam context.Context, listObjectVersionsInput *s3.ListObjectVersionsInput) (s3response.ListVersionsResult, error) {
if mock.ListObjectVersionsFunc == nil {
panic("BackendMock.ListObjectVersionsFunc: method is nil but Backend.ListObjectVersions was just called")
}
@@ -2202,21 +2204,23 @@ func (mock *BackendMock) PutBucketTaggingCalls() []struct {
}
// PutBucketVersioning calls PutBucketVersioningFunc.
func (mock *BackendMock) PutBucketVersioning(contextMoqParam context.Context, putBucketVersioningInput *s3.PutBucketVersioningInput) error {
func (mock *BackendMock) PutBucketVersioning(contextMoqParam context.Context, bucket string, status types.BucketVersioningStatus) error {
if mock.PutBucketVersioningFunc == nil {
panic("BackendMock.PutBucketVersioningFunc: method is nil but Backend.PutBucketVersioning was just called")
}
callInfo := struct {
ContextMoqParam context.Context
PutBucketVersioningInput *s3.PutBucketVersioningInput
ContextMoqParam context.Context
Bucket string
Status types.BucketVersioningStatus
}{
ContextMoqParam: contextMoqParam,
PutBucketVersioningInput: putBucketVersioningInput,
ContextMoqParam: contextMoqParam,
Bucket: bucket,
Status: status,
}
mock.lockPutBucketVersioning.Lock()
mock.calls.PutBucketVersioning = append(mock.calls.PutBucketVersioning, callInfo)
mock.lockPutBucketVersioning.Unlock()
return mock.PutBucketVersioningFunc(contextMoqParam, putBucketVersioningInput)
return mock.PutBucketVersioningFunc(contextMoqParam, bucket, status)
}
// PutBucketVersioningCalls gets all the calls that were made to PutBucketVersioning.
@@ -2224,12 +2228,14 @@ func (mock *BackendMock) PutBucketVersioning(contextMoqParam context.Context, pu
//
// len(mockedBackend.PutBucketVersioningCalls())
func (mock *BackendMock) PutBucketVersioningCalls() []struct {
ContextMoqParam context.Context
PutBucketVersioningInput *s3.PutBucketVersioningInput
ContextMoqParam context.Context
Bucket string
Status types.BucketVersioningStatus
} {
var calls []struct {
ContextMoqParam context.Context
PutBucketVersioningInput *s3.PutBucketVersioningInput
ContextMoqParam context.Context
Bucket string
Status types.BucketVersioningStatus
}
mock.lockPutBucketVersioning.RLock()
calls = mock.calls.PutBucketVersioning
@@ -2238,7 +2244,7 @@ func (mock *BackendMock) PutBucketVersioningCalls() []struct {
}
// PutObject calls PutObjectFunc.
func (mock *BackendMock) PutObject(contextMoqParam context.Context, putObjectInput *s3.PutObjectInput) (string, error) {
func (mock *BackendMock) PutObject(contextMoqParam context.Context, putObjectInput *s3.PutObjectInput) (s3response.PutObjectOutput, error) {
if mock.PutObjectFunc == nil {
panic("BackendMock.PutObjectFunc: method is nil but Backend.PutObject was just called")
}

View File

@@ -382,6 +382,11 @@ func (c S3ApiController) GetActions(ctx *fiber.Ctx) error {
})
}
action := auth.GetObjectAction
if versionId != "" {
action = auth.GetObjectVersionAction
}
err := auth.VerifyAccess(ctx.Context(), c.be, auth.AccessOptions{
Readonly: c.readonly,
Acl: parsedAcl,
@@ -390,7 +395,7 @@ func (c S3ApiController) GetActions(ctx *fiber.Ctx) error {
Acc: acct,
Bucket: bucket,
Object: key,
Action: auth.GetObjectAction,
Action: action,
})
if err != nil {
return SendResponse(ctx, err,
@@ -410,11 +415,23 @@ func (c S3ApiController) GetActions(ctx *fiber.Ctx) error {
VersionId: &versionId,
})
if err != nil {
if res != nil {
utils.SetResponseHeaders(ctx, []utils.CustomHeader{
{
Key: "x-amz-delete-marker",
Value: "true",
},
{
Key: "Last-Modified",
Value: res.LastModified.Format(timefmt),
},
})
}
return SendResponse(ctx, err,
&MetaOpts{
Logger: c.logger,
MetricsMng: c.mm,
Action: metrics.ActionGetObject,
Action: metrics.ActionHeadObject,
BucketOwner: parsedAcl.Owner,
})
}
@@ -478,6 +495,15 @@ func (c S3ApiController) GetActions(ctx *fiber.Ctx) error {
utils.SetMetaHeaders(ctx, res.Metadata)
// Set other response headers
utils.SetResponseHeaders(ctx, hdrs)
// Set version id header
if getstring(res.VersionId) != "" {
utils.SetResponseHeaders(ctx, []utils.CustomHeader{
{
Key: "x-amz-version-id",
Value: getstring(res.VersionId),
},
})
}
status := http.StatusOK
if acceptRange != "" {
@@ -981,8 +1007,8 @@ func (c S3ApiController) PutBucketActions(ctx *fiber.Ctx) error {
objectOwnership := types.ObjectOwnership(
ctx.Get("X-Amz-Object-Ownership", string(types.ObjectOwnershipBucketOwnerEnforced)),
)
mfa := ctx.Get("X-Amz-Mfa")
contentMD5 := ctx.Get("Content-MD5")
// mfa := ctx.Get("X-Amz-Mfa")
// contentMD5 := ctx.Get("Content-MD5")
acct := ctx.Locals("account").(auth.Account)
isRoot := ctx.Locals("isRoot").(bool)
@@ -1136,13 +1162,21 @@ func (c S3ApiController) PutBucketActions(ctx *fiber.Ctx) error {
})
}
err = c.be.PutBucketVersioning(ctx.Context(),
&s3.PutBucketVersioningInput{
Bucket: &bucket,
MFA: &mfa,
VersioningConfiguration: &versioningConf,
ContentMD5: &contentMD5,
})
if versioningConf.Status != types.BucketVersioningStatusEnabled &&
versioningConf.Status != types.BucketVersioningStatusSuspended {
if c.debug {
log.Printf("invalid versioning configuration status: %v\n", versioningConf.Status)
}
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrMalformedXML),
&MetaOpts{
Logger: c.logger,
MetricsMng: c.mm,
Action: metrics.ActionPutBucketVersioning,
BucketOwner: parsedAcl.Owner,
})
}
err = c.be.PutBucketVersioning(ctx.Context(), bucket, versioningConf.Status)
return SendResponse(ctx, err,
&MetaOpts{
Logger: c.logger,
@@ -1816,6 +1850,14 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error {
ExpectedBucketOwner: &bucketOwner,
CopySourceRange: &copySrcRange,
})
if err == nil && resp.CopySourceVersionId != "" {
utils.SetResponseHeaders(ctx, []utils.CustomHeader{
{
Key: "x-amz-copy-source-version-id",
Value: resp.CopySourceVersionId,
},
})
}
return SendXMLResponse(ctx, resp, err,
&MetaOpts{
Logger: c.logger,
@@ -2141,6 +2183,21 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error {
StorageClass: types.StorageClass(storageClass),
})
if err == nil {
hdrs := []utils.CustomHeader{}
if getstring(res.VersionId) != "" {
hdrs = append(hdrs, utils.CustomHeader{
Key: "x-amz-version-id",
Value: getstring(res.VersionId),
})
}
if getstring(res.CopySourceVersionId) != "" {
hdrs = append(hdrs, utils.CustomHeader{
Key: "x-amz-copy-source-version-id",
Value: getstring(res.CopySourceVersionId),
})
}
utils.SetResponseHeaders(ctx, hdrs)
return SendXMLResponse(ctx, res.CopyObjectResult, err,
&MetaOpts{
Logger: c.logger,
@@ -2232,7 +2289,7 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error {
}
ctx.Locals("logReqBody", false)
etag, err := c.be.PutObject(ctx.Context(),
res, err := c.be.PutObject(ctx.Context(),
&s3.PutObjectInput{
Bucket: &bucket,
Key: &keyStart,
@@ -2246,8 +2303,36 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error {
ObjectLockMode: objLock.ObjectLockMode,
ObjectLockLegalHoldStatus: objLock.LegalHoldStatus,
})
ctx.Response().Header.Set("ETag", etag)
return SendResponse(ctx, err,
if err != nil {
return SendResponse(ctx, err,
&MetaOpts{
Logger: c.logger,
MetricsMng: c.mm,
ContentLength: contentLength,
EvSender: c.evSender,
Action: metrics.ActionPutObject,
BucketOwner: parsedAcl.Owner,
ObjectSize: contentLength,
EventName: s3event.EventObjectCreatedPut,
})
}
hdrs := []utils.CustomHeader{
{
Key: "ETag",
Value: res.ETag,
},
}
if res.VersionID != "" {
hdrs = append(hdrs, utils.CustomHeader{
Key: "x-amz-version-id",
Value: res.VersionID,
})
}
utils.SetResponseHeaders(ctx, hdrs)
return SendResponse(ctx, nil,
&MetaOpts{
Logger: c.logger,
MetricsMng: c.mm,
@@ -2255,7 +2340,7 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error {
EvSender: c.evSender,
Action: metrics.ActionPutObject,
BucketOwner: parsedAcl.Owner,
ObjectETag: &etag,
ObjectETag: &res.ETag,
ObjectSize: contentLength,
EventName: s3event.EventObjectCreatedPut,
})
@@ -2569,6 +2654,8 @@ func (c S3ApiController) DeleteActions(ctx *fiber.Ctx) error {
})
}
//TODO: check s3:DeleteObjectVersion policy in case a use tries to delete a version of an object
err := auth.VerifyAccess(ctx.Context(), c.be,
auth.AccessOptions{
Readonly: c.readonly,
@@ -2604,13 +2691,42 @@ func (c S3ApiController) DeleteActions(ctx *fiber.Ctx) error {
})
}
err = c.be.DeleteObject(ctx.Context(),
res, err := c.be.DeleteObject(ctx.Context(),
&s3.DeleteObjectInput{
Bucket: &bucket,
Key: &key,
VersionId: &versionId,
})
return SendResponse(ctx, err,
if err != nil {
return SendResponse(ctx, err,
&MetaOpts{
Logger: c.logger,
MetricsMng: c.mm,
EvSender: c.evSender,
Action: metrics.ActionDeleteObject,
BucketOwner: parsedAcl.Owner,
EventName: s3event.EventObjectRemovedDelete,
Status: http.StatusNoContent,
})
}
hdrs := []utils.CustomHeader{}
if res.VersionId != nil && *res.VersionId != "" {
hdrs = append(hdrs, utils.CustomHeader{
Key: "x-amz-version-id",
Value: *res.VersionId,
})
}
if res.DeleteMarker != nil && *res.DeleteMarker {
hdrs = append(hdrs, utils.CustomHeader{
Key: "x-amz-delete-marker",
Value: "true",
})
}
utils.SetResponseHeaders(ctx, hdrs)
return SendResponse(ctx, nil,
&MetaOpts{
Logger: c.logger,
MetricsMng: c.mm,
@@ -2683,6 +2799,7 @@ func (c S3ApiController) HeadObject(ctx *fiber.Ctx) error {
isRoot := ctx.Locals("isRoot").(bool)
parsedAcl := ctx.Locals("parsedAcl").(auth.ACL)
partNumberQuery := int32(ctx.QueryInt("partNumber", -1))
versionId := ctx.Query("versionId")
key := ctx.Params("key")
keyEnd := ctx.Params("*1")
if keyEnd != "" {
@@ -2737,8 +2854,21 @@ func (c S3ApiController) HeadObject(ctx *fiber.Ctx) error {
Bucket: &bucket,
Key: &key,
PartNumber: partNumber,
VersionId: &versionId,
})
if err != nil {
if res != nil {
utils.SetResponseHeaders(ctx, []utils.CustomHeader{
{
Key: "x-amz-delete-marker",
Value: "true",
},
{
Key: "Last-Modified",
Value: res.LastModified.Format(timefmt),
},
})
}
return SendResponse(ctx, err,
&MetaOpts{
Logger: c.logger,
@@ -2826,6 +2956,13 @@ func (c S3ApiController) HeadObject(ctx *fiber.Ctx) error {
Value: contentType,
})
if getstring(res.VersionId) != "" {
headers = append(headers, utils.CustomHeader{
Key: "x-amz-version-id",
Value: getstring(res.VersionId),
})
}
utils.SetResponseHeaders(ctx, headers)
return SendResponse(ctx, nil,
@@ -3015,6 +3152,14 @@ func (c S3ApiController) CreateActions(ctx *fiber.Ctx) error {
},
})
if err == nil {
if getstring(res.VersionId) != "" {
utils.SetResponseHeaders(ctx, []utils.CustomHeader{
{
Key: "x-amz-version-id",
Value: getstring(res.VersionId),
},
})
}
return SendXMLResponse(ctx, res, err,
&MetaOpts{
Logger: c.logger,

View File

@@ -385,8 +385,8 @@ func TestS3ApiController_ListActions(t *testing.T) {
GetBucketVersioningFunc: func(contextMoqParam context.Context, bucket string) (*s3.GetBucketVersioningOutput, error) {
return &s3.GetBucketVersioningOutput{}, nil
},
ListObjectVersionsFunc: func(contextMoqParam context.Context, listObjectVersionsInput *s3.ListObjectVersionsInput) (*s3.ListObjectVersionsOutput, error) {
return &s3.ListObjectVersionsOutput{}, nil
ListObjectVersionsFunc: func(contextMoqParam context.Context, listObjectVersionsInput *s3.ListObjectVersionsInput) (s3response.ListVersionsResult, error) {
return s3response.ListVersionsResult{}, nil
},
GetBucketPolicyFunc: func(contextMoqParam context.Context, bucket string) ([]byte, error) {
return []byte{}, nil
@@ -677,7 +677,7 @@ func TestS3ApiController_PutBucketActions(t *testing.T) {
PutBucketTaggingFunc: func(contextMoqParam context.Context, bucket string, tags map[string]string) error {
return nil
},
PutBucketVersioningFunc: func(contextMoqParam context.Context, putBucketVersioningInput *s3.PutBucketVersioningInput) error {
PutBucketVersioningFunc: func(contextMoqParam context.Context, bucket string, status types.BucketVersioningStatus) error {
return nil
},
PutBucketPolicyFunc: func(contextMoqParam context.Context, bucket string, policy []byte) error {
@@ -968,8 +968,8 @@ func TestS3ApiController_PutActions(t *testing.T) {
CopyObjectResult: &types.CopyObjectResult{},
}, nil
},
PutObjectFunc: func(context.Context, *s3.PutObjectInput) (string, error) {
return "ETag", nil
PutObjectFunc: func(context.Context, *s3.PutObjectInput) (s3response.PutObjectOutput, error) {
return s3response.PutObjectOutput{}, nil
},
UploadPartFunc: func(context.Context, *s3.UploadPartInput) (string, error) {
return "hello", nil
@@ -1383,8 +1383,8 @@ func TestS3ApiController_DeleteActions(t *testing.T) {
GetBucketAclFunc: func(context.Context, *s3.GetBucketAclInput) ([]byte, error) {
return acldata, nil
},
DeleteObjectFunc: func(context.Context, *s3.DeleteObjectInput) error {
return nil
DeleteObjectFunc: func(contextMoqParam context.Context, deleteObjectInput *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) {
return &s3.DeleteObjectOutput{}, nil
},
AbortMultipartUploadFunc: func(context.Context, *s3.AbortMultipartUploadInput) error {
return nil
@@ -1414,8 +1414,8 @@ func TestS3ApiController_DeleteActions(t *testing.T) {
GetBucketAclFunc: func(context.Context, *s3.GetBucketAclInput) ([]byte, error) {
return acldata, nil
},
DeleteObjectFunc: func(context.Context, *s3.DeleteObjectInput) error {
return s3err.GetAPIError(7)
DeleteObjectFunc: func(contextMoqParam context.Context, deleteObjectInput *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
},
GetObjectLockConfigurationFunc: func(contextMoqParam context.Context, bucket string) ([]byte, error) {
return nil, s3err.GetAPIError(s3err.ErrObjectLockConfigurationNotFound)

View File

@@ -132,6 +132,8 @@ const (
ErrMissingSecurityHeader
ErrInvalidMetadataDirective
ErrKeyTooLong
ErrInvalidVersionId
ErrNoSuchVersion
// Non-AWS errors
ErrExistingObjectIsDirectory
@@ -517,8 +519,12 @@ var errorCodeResponse = map[ErrorCode]APIError{
HTTPStatusCode: http.StatusNotFound,
},
ErrInvalidMetadataDirective: {
Code: "InvalidArgument",
Description: "Unknown metadata directive.",
},
ErrInvalidVersionId: {
Code: "InvalidArgument",
Description: "Unknown metadata directive.",
Description: "Invalid version id specified",
HTTPStatusCode: http.StatusBadRequest,
},
ErrKeyTooLong: {
@@ -526,6 +532,11 @@ var errorCodeResponse = map[ErrorCode]APIError{
Description: "Your key is too long.",
HTTPStatusCode: http.StatusBadRequest,
},
ErrNoSuchVersion: {
Code: "NoSuchVersion",
Description: "The specified version does not exist.",
HTTPStatusCode: http.StatusNotFound,
},
// non aws errors
ErrExistingObjectIsDirectory: {

View File

@@ -23,6 +23,11 @@ import (
const RFC3339TimeFormat = "2006-01-02T15:04:05.999Z"
type PutObjectOutput struct {
ETag string
VersionID string
}
// Part describes part metadata.
type Part struct {
PartNumber int
@@ -302,9 +307,10 @@ type CanonicalUser struct {
}
type CopyObjectResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CopyObjectResult" json:"-"`
LastModified time.Time
ETag string
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CopyObjectResult" json:"-"`
LastModified time.Time
ETag string
CopySourceVersionId string `xml:"-"`
}
func (r CopyObjectResult) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
@@ -360,3 +366,21 @@ type InitiateMultipartUploadResult struct {
Key string
UploadId string
}
type ListVersionsResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListVersionsResult" json:"-"`
CommonPrefixes []types.CommonPrefix
DeleteMarkers []types.DeleteMarkerEntry `xml:"DeleteMarker"`
Delimiter *string
EncodingType types.EncodingType
IsTruncated *bool
KeyMarker *string
MaxKeys *int32
Name *string
NextKeyMarker *string
NextVersionIdMarker *string
Prefix *string
RequestCharged types.RequestCharged
VersionIdMarker *string
Versions []types.ObjectVersion `xml:"Version"`
}

View File

@@ -99,6 +99,7 @@ check_universal_vars() {
if [ "$RUN_VERSITYGW" != "true" ] && [ "$RUN_VERSITYGW" != "false" ]; then
fail "RUN_VERSITYGW must be 'true' or 'false'"
fi
<<<<<<< HEAD
if [ -z "$BUCKET_ONE_NAME" ]; then
log 1 "BUCKET_ONE_NAME missing"
@@ -113,12 +114,29 @@ check_universal_vars() {
exit 1
fi
if [ "$RECREATE_BUCKETS" != "true" ] && [ "$RECREATE_BUCKETS" != "false" ]; then
<<<<<<< HEAD
log 1 "RECREATE_BUCKETS must be 'true' or 'false'"
exit 1
fi
if [ -z "$TEST_FILE_FOLDER" ]; then
log 1 "TEST_FILE_FOLDER missing"
exit 1
=======
fail "RECREATE_BUCKETS must be 'true' or 'false'"
=======
if [[ -n "$VERSITY_LOG_FILE" ]]; then
export VERSITY_LOG_FILE
fi
if [[ -n "$DIRECT" ]]; then
export DIRECT
fi
if [[ -n "$DIRECT_DISPLAY_NAME" ]]; then
export DIRECT_DISPLAY_NAME
fi
if [[ -n "$COVERAGE_DB" ]]; then
export COVERAGE_DB
>>>>>>> 95e62d2 (fix: Merge conflicts resolved)
>>>>>>> 8595c19 (feat: Added integration tests for bucket object versioning. Made a couple of bug fixes in the versioning implementation)
fi
# exporting these since they're needed for subshells
export AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY AWS_REGION AWS_PROFILE AWS_ENDPOINT_URL

View File

@@ -468,6 +468,9 @@ func TestFullFlow(s *S3Conf) {
TestGetObjectLegalHold(s)
TestWORMProtection(s)
TestAccessControl(s)
if s.versioningEnabled {
TestVersioning(s)
}
}
func TestPosix(s *S3Conf) {
@@ -502,6 +505,45 @@ func TestAccessControl(s *S3Conf) {
AccessControl_copy_object_with_starting_slash_for_user(s)
}
func TestVersioning(s *S3Conf) {
// PutBucketVersioning action
PutBucketVersioning_non_existing_bucket(s)
PutBucketVersioning_invalid_status(s)
PutBucketVersioning_success(s)
// GetBucketVersioning action
GetBucketVersioning_non_existing_bucket(s)
GetBucketVersioning_success(s)
Versioning_PutObject_success(s)
// CopyObject action
Versioning_CopyObject_success(s)
Versioning_CopyObject_non_existing_version_id(s)
Versioning_CopyObject_from_an_object_version(s)
// HeadObject action
Versioning_HeadObject_invalid_versionId(s)
Versioning_HeadObject_success(s)
Versioning_HeadObject_delete_marker(s)
// GetObject action
Versioning_GetObject_invalid_versionId(s)
Versioning_GetObject_success(s)
Versioning_GetObject_delete_marker(s)
// DeleteObject(s) actions
Versioning_DeleteObject_delete_object_version(s)
Versioning_DeleteObject_delete_a_delete_marker(s)
Versioning_DeleteObjects_success(s)
Versioning_DeleteObjects_delete_deleteMarkers(s)
// ListObjectVersions
ListObjectVersions_non_existing_bucket(s)
ListObjectVersions_list_single_object_versions(s)
ListObjectVersions_list_multiple_object_versions(s)
ListObjectVersions_multiple_object_versions_truncated(s)
ListObjectVersions_with_delete_markers(s)
// Multipart upload
Versioning_Multipart_Upload_success(s)
Versioning_Multipart_Upload_overwrite_an_object(s)
Versioning_UploadPartCopy_non_existing_versionId(s)
Versioning_UploadPartCopy_from_an_object_version(s)
}
type IntTests map[string]func(s *S3Conf) error
func GetIntTests() IntTests {
@@ -810,5 +852,33 @@ func GetIntTests() IntTests {
"AccessControl_root_PutBucketAcl": AccessControl_root_PutBucketAcl,
"AccessControl_user_PutBucketAcl_with_policy_access": AccessControl_user_PutBucketAcl_with_policy_access,
"AccessControl_copy_object_with_starting_slash_for_user": AccessControl_copy_object_with_starting_slash_for_user,
"PutBucketVersioning_non_existing_bucket": PutBucketVersioning_non_existing_bucket,
"PutBucketVersioning_invalid_status": PutBucketVersioning_invalid_status,
"PutBucketVersioning_success": PutBucketVersioning_success,
"GetBucketVersioning_non_existing_bucket": GetBucketVersioning_non_existing_bucket,
"GetBucketVersioning_success": GetBucketVersioning_success,
"Versioning_PutObject_success": Versioning_PutObject_success,
"Versioning_CopyObject_success": Versioning_CopyObject_success,
"Versioning_CopyObject_non_existing_version_id": Versioning_CopyObject_non_existing_version_id,
"Versioning_CopyObject_from_an_object_version": Versioning_CopyObject_from_an_object_version,
"Versioning_HeadObject_invalid_versionId": Versioning_HeadObject_invalid_versionId,
"Versioning_HeadObject_success": Versioning_HeadObject_success,
"Versioning_HeadObject_delete_marker": Versioning_HeadObject_delete_marker,
"Versioning_GetObject_invalid_versionId": Versioning_GetObject_invalid_versionId,
"Versioning_GetObject_success": Versioning_GetObject_success,
"Versioning_GetObject_delete_marker": Versioning_GetObject_delete_marker,
"Versioning_DeleteObject_delete_object_version": Versioning_DeleteObject_delete_object_version,
"Versioning_DeleteObject_delete_a_delete_marker": Versioning_DeleteObject_delete_a_delete_marker,
"Versioning_DeleteObjects_success": Versioning_DeleteObjects_success,
"Versioning_DeleteObjects_delete_deleteMarkers": Versioning_DeleteObjects_delete_deleteMarkers,
"ListObjectVersions_non_existing_bucket": ListObjectVersions_non_existing_bucket,
"ListObjectVersions_list_single_object_versions": ListObjectVersions_list_single_object_versions,
"ListObjectVersions_list_multiple_object_versions": ListObjectVersions_list_multiple_object_versions,
"ListObjectVersions_multiple_object_versions_truncated": ListObjectVersions_multiple_object_versions_truncated,
"ListObjectVersions_with_delete_markers": ListObjectVersions_with_delete_markers,
"Versioning_Multipart_Upload_success": Versioning_Multipart_Upload_success,
"Versioning_Multipart_Upload_overwrite_an_object": Versioning_Multipart_Upload_overwrite_an_object,
"Versioning_UploadPartCopy_non_existing_versionId": Versioning_UploadPartCopy_non_existing_versionId,
"Versioning_UploadPartCopy_from_an_object_version": Versioning_UploadPartCopy_from_an_object_version,
}
}

View File

@@ -31,15 +31,16 @@ import (
)
type S3Conf struct {
awsID string
awsSecret string
awsRegion string
endpoint string
checksumDisable bool
pathStyle bool
PartSize int64
Concurrency int
debug bool
awsID string
awsSecret string
awsRegion string
endpoint string
checksumDisable bool
pathStyle bool
PartSize int64
Concurrency int
debug bool
versioningEnabled bool
}
func NewS3Conf(opts ...Option) *S3Conf {
@@ -80,6 +81,9 @@ func WithConcurrency(c int) Option {
func WithDebug() Option {
return func(s *S3Conf) { s.debug = true }
}
func WithVersioningEnabled() Option {
return func(s *S3Conf) { s.versioningEnabled = true }
}
func (c *S3Conf) getCreds() credentials.StaticCredentialsProvider {
// TODO support token/IAM

File diff suppressed because it is too large Load Diff

View File

@@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"io"
"math/big"
rnd "math/rand"
"net/http"
"net/url"
@@ -70,7 +71,25 @@ func setup(s *S3Conf, bucket string, opts ...setupOpt) error {
ObjectOwnership: cfg.Ownership,
})
cancel()
return err
if err != nil {
return err
}
if cfg.VersioningEnabled {
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err := s3client.PutBucketVersioning(ctx, &s3.PutBucketVersioningInput{
Bucket: &bucket,
VersioningConfiguration: &types.VersioningConfiguration{
Status: types.BucketVersioningStatusEnabled,
},
})
cancel()
if err != nil {
return err
}
}
return nil
}
func teardown(s *S3Conf, bucket string) error {
@@ -90,24 +109,31 @@ func teardown(s *S3Conf, bucket string) error {
return nil
}
in := &s3.ListObjectsV2Input{Bucket: &bucket}
in := &s3.ListObjectVersionsInput{Bucket: &bucket}
for {
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
out, err := s3client.ListObjectsV2(ctx, in)
out, err := s3client.ListObjectVersions(ctx, in)
cancel()
if err != nil {
return fmt.Errorf("failed to list objects: %w", err)
}
for _, item := range out.Contents {
err = deleteObject(&bucket, item.Key, nil)
for _, item := range out.Versions {
err = deleteObject(&bucket, item.Key, item.VersionId)
if err != nil {
return err
}
}
for _, item := range out.DeleteMarkers {
err = deleteObject(&bucket, item.Key, item.VersionId)
if err != nil {
return err
}
}
if out.IsTruncated != nil && *out.IsTruncated {
in.ContinuationToken = out.ContinuationToken
in.KeyMarker = out.KeyMarker
in.VersionIdMarker = out.NextVersionIdMarker
} else {
break
}
@@ -122,8 +148,9 @@ func teardown(s *S3Conf, bucket string) error {
}
type setupCfg struct {
LockEnabled bool
Ownership types.ObjectOwnership
LockEnabled bool
VersioningEnabled bool
Ownership types.ObjectOwnership
}
type setupOpt func(*setupCfg)
@@ -134,6 +161,9 @@ func withLock() setupOpt {
func withOwnership(o types.ObjectOwnership) setupOpt {
return func(s *setupCfg) { s.Ownership = o }
}
func withVersioning() setupOpt {
return func(s *setupCfg) { s.VersioningEnabled = true }
}
func actionHandler(s *S3Conf, testName string, handler func(s3client *s3.Client, bucket string) error, opts ...setupOpt) error {
runF(testName)
@@ -313,18 +343,31 @@ func putObjects(client *s3.Client, objs []string, bucket string) ([]types.Object
return contents, nil
}
func putObjectWithData(lgth int64, input *s3.PutObjectInput, client *s3.Client) (csum [32]byte, data []byte, err error) {
data = make([]byte, lgth)
type putObjectOutput struct {
csum [32]byte
data []byte
res *s3.PutObjectOutput
}
func putObjectWithData(lgth int64, input *s3.PutObjectInput, client *s3.Client) (*putObjectOutput, error) {
data := make([]byte, lgth)
rand.Read(data)
csum = sha256.Sum256(data)
csum := sha256.Sum256(data)
r := bytes.NewReader(data)
input.Body = r
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err = client.PutObject(ctx, input)
res, err := client.PutObject(ctx, input)
cancel()
if err != nil {
return nil, err
}
return
return &putObjectOutput{
csum: csum,
data: data,
res: res,
}, nil
}
func createMp(s3client *s3.Client, bucket, key string) (*s3.CreateMultipartUploadOutput, error) {
@@ -522,21 +565,40 @@ func comparePrefixes(list1 []string, list2 []types.CommonPrefix) bool {
return true
}
func compareDelObjects(list1 []string, list2 []types.DeletedObject) bool {
func compareDelObjects(list1, list2 []types.DeletedObject) bool {
if len(list1) != len(list2) {
return false
}
elementMap := make(map[string]bool)
for _, elem := range list1 {
elementMap[elem] = true
}
for _, elem := range list2 {
if _, found := elementMap[*elem.Key]; !found {
for i, obj := range list1 {
if *obj.Key != *list2[i].Key {
return false
}
if obj.VersionId != nil {
if list2[i].VersionId == nil {
return false
}
if *obj.VersionId != *list2[i].VersionId {
return false
}
}
if obj.DeleteMarkerVersionId != nil {
if list2[i].DeleteMarkerVersionId == nil {
return false
}
if *obj.DeleteMarkerVersionId != *list2[i].DeleteMarkerVersionId {
return false
}
}
if obj.DeleteMarker != nil {
if list2[i].DeleteMarker == nil {
return false
}
if *obj.DeleteMarker != *list2[i].DeleteMarker {
return false
}
}
}
return true
@@ -774,3 +836,124 @@ func checkWORMProtection(client *s3.Client, bucket, object string) error {
return nil
}
func createObjVersions(client *s3.Client, bucket, object string, count int) ([]types.ObjectVersion, error) {
versions := []types.ObjectVersion{}
for i := 0; i < count; i++ {
rNumber, err := rand.Int(rand.Reader, big.NewInt(100000))
dataLength := rNumber.Int64()
if err != nil {
return nil, err
}
r, err := putObjectWithData(dataLength, &s3.PutObjectInput{
Bucket: &bucket,
Key: &object,
}, client)
if err != nil {
return nil, err
}
isLatest := i == count-1
versions = append(versions, types.ObjectVersion{
ETag: r.res.ETag,
IsLatest: &isLatest,
Key: &object,
Size: &dataLength,
VersionId: r.res.VersionId,
})
}
versions = reverseSlice(versions)
return versions, nil
}
// ReverseSlice reverses a slice of any type
func reverseSlice[T any](s []T) []T {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
s[i], s[j] = s[j], s[i]
}
return s
}
func compareVersions(v1, v2 []types.ObjectVersion) bool {
if len(v1) != len(v2) {
return false
}
for i, version := range v1 {
if version.Key == nil || v2[i].Key == nil {
return false
}
if *version.Key != *v2[i].Key {
return false
}
if version.VersionId == nil || v2[i].VersionId == nil {
return false
}
if *version.VersionId != *v2[i].VersionId {
return false
}
if version.IsLatest == nil || v2[i].IsLatest == nil {
return false
}
if *version.IsLatest != *v2[i].IsLatest {
return false
}
if version.Size == nil || v2[i].Size == nil {
return false
}
if *version.Size != *v2[i].Size {
return false
}
if version.ETag == nil || v2[i].ETag == nil {
return false
}
if *version.ETag != *v2[i].ETag {
return false
}
}
return true
}
func compareDelMarkers(d1, d2 []types.DeleteMarkerEntry) bool {
if len(d1) != len(d2) {
return false
}
for i, dEntry := range d1 {
if dEntry.Key == nil || d2[i].Key == nil {
return false
}
if *dEntry.Key != *d2[i].Key {
return false
}
if dEntry.IsLatest == nil || d2[i].IsLatest == nil {
return false
}
if *dEntry.IsLatest != *d2[i].IsLatest {
return false
}
if dEntry.VersionId == nil || d2[i].VersionId == nil {
return false
}
if *dEntry.VersionId != *d2[i].VersionId {
return false
}
}
return true
}
func getBoolPtr(b bool) *bool {
return &b
}