feat: Added integration tests for bucket object versioning. Made a couple of bug fixes in the versioning implementation

This commit is contained in:
jonaustin09
2024-08-08 15:45:15 -04:00
committed by Ben McClelland
parent 8252ecd452
commit 6d4ff09d6f
11 changed files with 1579 additions and 161 deletions

View File

@@ -152,7 +152,7 @@ func New(rootdir string, meta meta.MetadataStorer, opts PosixOpts) (*Posix, erro
return nil, fmt.Errorf("versioning path should be a directory")
}
fmt.Printf("bucket versioning enabled with directory: %v\n", verioningdirAbs)
fmt.Printf("Bucket versioning enabled with directory: %v\n", verioningdirAbs)
}
return &Posix{
@@ -500,6 +500,16 @@ func (p *Posix) GetBucketVersioning(_ context.Context, bucket string) (*s3.GetBu
return &s3.GetBucketVersioningOutput{}, nil
}
// Returns the specified bucket versioning status
func (p *Posix) isBucketVersioningEnabled(ctx context.Context, bucket string) (bool, error) {
res, err := p.GetBucketVersioning(ctx, bucket)
if err != nil {
return false, err
}
return res.Status == types.BucketVersioningStatusEnabled, nil
}
// Generates the object version path in the versioning directory
func (p *Posix) genObjVersionPath(bucket, key string) string {
return filepath.Join(p.versioningDir, bucket, genObjVersionKey(key))
@@ -522,7 +532,7 @@ func (p *Posix) createObjVersion(bucket, key string, size int64, acc auth.Accoun
var versionId string
data, err := p.meta.RetrieveAttribute(bucket, key, versionIdKey)
if err != nil {
if err == nil {
versionId = string(data)
} else {
versionId = ulid.Make().String()
@@ -651,7 +661,7 @@ func (p *Posix) isObjDeleteMarker(bucket, object string) (bool, error) {
// Converts the file to object version. Finds all the object versions,
// delete markers from the versioning directory and returns
func (p *Posix) fileToObjVersions(bucket string) backend.GetVersionsFunc {
return func(path, versionIdMarker string, availableObjCount int, d fs.DirEntry) (*backend.ObjVersionFuncResult, error) {
return func(path, versionIdMarker string, pastVersionIdMarker *bool, availableObjCount int, d fs.DirEntry) (*backend.ObjVersionFuncResult, error) {
var objects []types.ObjectVersion
var delMarkers []types.DeleteMarkerEntry
// if the number of available objects is 0, return truncated response
@@ -663,8 +673,44 @@ func (p *Posix) fileToObjVersions(bucket string) backend.GetVersionsFunc {
}, nil
}
if d.IsDir() {
//TODO: directory objects can't have versions, but they are listed in object versions result?
return nil, backend.ErrSkipObj
// directory object only happens if directory empty
// check to see if this is a directory object by checking etag
etagBytes, err := p.meta.RetrieveAttribute(bucket, path, etagkey)
if errors.Is(err, meta.ErrNoSuchKey) || errors.Is(err, fs.ErrNotExist) {
return nil, backend.ErrSkipObj
}
if err != nil {
return nil, fmt.Errorf("get etag: %w", err)
}
etag := string(etagBytes)
fi, err := d.Info()
if errors.Is(err, fs.ErrNotExist) {
return nil, backend.ErrSkipObj
}
if err != nil {
return nil, fmt.Errorf("get fileinfo: %w", err)
}
key := path + "/"
// Directory objects don't contain data
size := int64(0)
versionId := "null"
objects = append(objects, types.ObjectVersion{
ETag: &etag,
Key: &key,
LastModified: backend.GetTimePtr(fi.ModTime()),
IsLatest: getBoolPtr(true),
Size: &size,
VersionId: &versionId,
})
return &backend.ObjVersionFuncResult{
ObjectVersions: objects,
DelMarkers: delMarkers,
Truncated: availableObjCount == 1,
}, nil
}
// file object, get object info and fill out object data
@@ -685,47 +731,58 @@ func (p *Posix) fileToObjVersions(bucket string) backend.GetVersionsFunc {
if err == nil {
versionId = string(versionIdBytes)
}
fi, err := d.Info()
if errors.Is(err, fs.ErrNotExist) {
return nil, backend.ErrSkipObj
if versionId == versionIdMarker {
*pastVersionIdMarker = true
}
if err != nil {
return nil, fmt.Errorf("get fileinfo: %w", err)
if *pastVersionIdMarker {
fi, err := d.Info()
if errors.Is(err, fs.ErrNotExist) {
return nil, backend.ErrSkipObj
}
if err != nil {
return nil, fmt.Errorf("get fileinfo: %w", err)
}
size := fi.Size()
isDel, err := p.isObjDeleteMarker(bucket, path)
if err != nil {
return nil, err
}
if isDel {
delMarkers = append(delMarkers, types.DeleteMarkerEntry{
IsLatest: getBoolPtr(true),
VersionId: &versionId,
LastModified: backend.GetTimePtr(fi.ModTime()),
Key: &path,
})
} else {
objects = append(objects, types.ObjectVersion{
ETag: &etag,
Key: &path,
LastModified: backend.GetTimePtr(fi.ModTime()),
Size: &size,
VersionId: &versionId,
IsLatest: getBoolPtr(true),
})
}
availableObjCount--
if availableObjCount == 0 {
return &backend.ObjVersionFuncResult{
ObjectVersions: objects,
DelMarkers: delMarkers,
Truncated: true,
NextVersionIdMarker: versionId,
}, nil
}
}
size := fi.Size()
isDel, err := p.isObjDeleteMarker(bucket, path)
if err != nil {
return nil, err
}
if isDel {
delMarkers = append(delMarkers, types.DeleteMarkerEntry{
IsLatest: getBoolPtr(true),
VersionId: &versionId,
LastModified: backend.GetTimePtr(fi.ModTime()),
Key: &path,
})
} else {
objects = append(objects, types.ObjectVersion{
ETag: &etag,
Key: &path,
LastModified: backend.GetTimePtr(fi.ModTime()),
Size: &size,
VersionId: &versionId,
IsLatest: getBoolPtr(true),
})
}
availableObjCount--
if availableObjCount == 0 {
if !p.versioningEnabled() {
return &backend.ObjVersionFuncResult{
ObjectVersions: objects,
DelMarkers: delMarkers,
Truncated: true,
NextVersionIdMarker: versionId,
ObjectVersions: objects,
DelMarkers: delMarkers,
}, nil
}
@@ -763,6 +820,13 @@ func (p *Posix) fileToObjVersions(bucket string) backend.GetVersionsFunc {
versionId := f.Name()
size := f.Size()
if !*pastVersionIdMarker {
if versionId == versionIdMarker {
*pastVersionIdMarker = true
}
continue
}
etagBytes, err := p.meta.RetrieveAttribute(versionPath, versionId, etagkey)
if errors.Is(err, fs.ErrNotExist) {
return nil, backend.ErrSkipObj
@@ -782,8 +846,9 @@ func (p *Posix) fileToObjVersions(bucket string) backend.GetVersionsFunc {
if isDel {
delMarkers = append(delMarkers, types.DeleteMarkerEntry{
VersionId: &versionId,
LastModified: backend.GetTimePtr(fi.ModTime()),
LastModified: backend.GetTimePtr(f.ModTime()),
Key: &path,
IsLatest: getBoolPtr(false),
})
} else {
objects = append(objects, types.ObjectVersion{
@@ -792,6 +857,7 @@ func (p *Posix) fileToObjVersions(bucket string) backend.GetVersionsFunc {
LastModified: backend.GetTimePtr(f.ModTime()),
Size: &size,
VersionId: &versionId,
IsLatest: getBoolPtr(false),
})
}
@@ -1870,6 +1936,11 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons
}, nil
}
vEnabled, err := p.isBucketVersioningEnabled(ctx, *po.Bucket)
if err != nil {
return s3response.PutObjectOutput{}, err
}
// object is file
d, err := os.Stat(name)
if err == nil && d.IsDir() {
@@ -1877,7 +1948,7 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons
}
// if the versioninng is enabled first create the file object version
if p.versioningEnabled() && err == nil {
if p.versioningEnabled() && vEnabled && err == nil {
_, err := p.createObjVersion(*po.Bucket, *po.Key, d.Size(), acct)
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("create object version: %w", err)
@@ -1987,7 +2058,7 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons
// if the versioning is enabled, generate a new versionID for the object
var versionID string
if p.versioningEnabled() {
if p.versioningEnabled() && vEnabled {
versionID = ulid.Make().String()
if err := p.meta.StoreAttribute(*po.Bucket, *po.Key, versionIdKey, []byte(versionID)); err != nil {
@@ -2011,6 +2082,7 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
bucket := *input.Bucket
object := *input.Key
isDir := strings.HasSuffix(object, "/")
_, err := os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
@@ -2022,17 +2094,36 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
objpath := filepath.Join(bucket, object)
if p.versioningEnabled() {
if *input.VersionId == "" {
vEnabled, err := p.isBucketVersioningEnabled(ctx, bucket)
if err != nil {
return nil, err
}
// Directory objects can't have versions
if !isDir && p.versioningEnabled() && vEnabled {
if getString(input.VersionId) == "" {
// if the versionId is not specified, make the current version a delete marker
_, err := os.Stat(objpath)
fi, err := os.Stat(objpath)
if err != nil {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
acct, ok := ctx.Value("account").(auth.Account)
if !ok {
acct = auth.Account{}
}
// Creates a new version in the versioning directory
_, err = p.createObjVersion(bucket, object, fi.Size(), acct)
if err != nil {
return nil, err
}
// Mark the object as a delete marker
if err := p.meta.StoreAttribute(bucket, object, deleteMarkerKey, []byte{}); err != nil {
return nil, fmt.Errorf("set delete marker: %w", err)
}
// Generate & set a unique versionId for the delete marker
versionId := ulid.Make().String()
if err := p.meta.StoreAttribute(bucket, object, versionIdKey, []byte(versionId)); err != nil {
return nil, fmt.Errorf("set versionId: %w", err)
@@ -2042,7 +2133,6 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
VersionId: &versionId,
}, nil
} else {
delMarker := true
versionPath := p.genObjVersionPath(bucket, object)
vId, err := p.meta.RetrieveAttribute(bucket, object, versionIdKey)
@@ -2054,7 +2144,12 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
// if the specified VersionId is the same as in the latest version,
// remove the latest version, find the latest version from the versioning
// directory and move to the place of the deleted object, to make it the latest
err := os.Remove(objpath)
isDelMarker, err := p.isObjDeleteMarker(bucket, object)
if err != nil {
return nil, err
}
err = os.Remove(objpath)
if err != nil {
return nil, fmt.Errorf("remove obj version: %w", err)
}
@@ -2062,7 +2157,7 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
ents, err := os.ReadDir(versionPath)
if errors.Is(err, fs.ErrNotExist) {
return &s3.DeleteObjectOutput{
DeleteMarker: &delMarker,
DeleteMarker: &isDelMarker,
VersionId: input.VersionId,
}, nil
}
@@ -2072,7 +2167,7 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
if len(ents) == 0 {
return &s3.DeleteObjectOutput{
DeleteMarker: &delMarker,
DeleteMarker: &isDelMarker,
VersionId: input.VersionId,
}, nil
}
@@ -2128,15 +2223,17 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
}
return &s3.DeleteObjectOutput{
DeleteMarker: &delMarker,
DeleteMarker: &isDelMarker,
VersionId: input.VersionId,
}, nil
}
isDelMarker, _ := p.isObjDeleteMarker(versionPath, *input.VersionId)
err = os.Remove(filepath.Join(versionPath, *input.VersionId))
if errors.Is(err, fs.ErrNotExist) {
return &s3.DeleteObjectOutput{
DeleteMarker: &delMarker,
DeleteMarker: &isDelMarker,
VersionId: input.VersionId,
}, nil
}
@@ -2145,11 +2242,10 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
}
return &s3.DeleteObjectOutput{
DeleteMarker: &delMarker,
DeleteMarker: &isDelMarker,
VersionId: input.VersionId,
}, nil
}
}
fi, err := os.Stat(objpath)
@@ -2199,7 +2295,7 @@ func (p *Posix) removeParents(bucket, object string) error {
for {
parent := filepath.Dir(objPath)
if parent == "." {
if parent == string(filepath.Separator) || parent == "." {
// stop removing parents if we hit the bucket directory.
break
}
@@ -2228,15 +2324,22 @@ func (p *Posix) DeleteObjects(ctx context.Context, input *s3.DeleteObjectsInput)
for _, obj := range input.Delete.Objects {
//TODO: Make the delete operation concurrent
res, err := p.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: input.Bucket,
Key: obj.Key,
Bucket: input.Bucket,
Key: obj.Key,
VersionId: obj.VersionId,
})
if err == nil {
delResult = append(delResult, types.DeletedObject{
delEntity := types.DeletedObject{
Key: obj.Key,
VersionId: res.VersionId,
DeleteMarker: res.DeleteMarker,
})
}
if delEntity.DeleteMarker != nil && *delEntity.DeleteMarker {
delEntity.DeleteMarkerVersionId = res.VersionId
} else {
delEntity.VersionId = res.VersionId
}
delResult = append(delResult, delEntity)
} else {
serr, ok := err.(s3err.APIError)
if ok {
@@ -2310,6 +2413,9 @@ func (p *Posix) GetObject(_ context.Context, input *s3.GetObjectInput) (*s3.GetO
fi, err := os.Stat(objPath)
if errors.Is(err, fs.ErrNotExist) {
if *input.VersionId != "" {
return nil, s3err.GetAPIError(s3err.ErrInvalidVersionId)
}
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if errors.Is(err, syscall.ENAMETOOLONG) {
@@ -2508,6 +2614,14 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.
}, nil
}
_, err := os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
}
if *input.VersionId != "" {
vId, err := p.meta.RetrieveAttribute(bucket, object, versionIdKey)
if errors.Is(err, fs.ErrNotExist) {
@@ -2527,14 +2641,6 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.
}
}
_, err := os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
}
objPath := filepath.Join(bucket, object)
fi, err := os.Stat(objPath)
@@ -2663,7 +2769,11 @@ func (p *Posix) CopyObject(ctx context.Context, input *s3.CopyObjectInput) (*s3.
copySource := cSplitted[0]
var srcVersionId string
if len(cSplitted) > 1 {
srcVersionId = cSplitted[1]
versionIdParts := strings.Split(cSplitted[1], "=")
if len(versionIdParts) != 2 || versionIdParts[0] != "versionId" {
return nil, s3err.GetAPIError(s3err.ErrInvalidRequest)
}
srcVersionId = versionIdParts[1]
}
srcBucket, srcObject, ok := strings.Cut(copySource, "/")
@@ -2681,9 +2791,28 @@ func (p *Posix) CopyObject(ctx context.Context, input *s3.CopyObjectInput) (*s3.
return nil, fmt.Errorf("stat bucket: %w", err)
}
vEnabled, err := p.isBucketVersioningEnabled(ctx, srcBucket)
if err != nil {
return nil, err
}
if srcVersionId != "" {
srcBucket = filepath.Join(p.versioningDir, srcBucket)
srcObject = filepath.Join(genObjVersionKey(srcObject), srcVersionId)
if !p.versioningEnabled() || !vEnabled {
return nil, s3err.GetAPIError(s3err.ErrInvalidVersionId)
}
vId, err := p.meta.RetrieveAttribute(srcBucket, srcObject, versionIdKey)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get src object version id: %w", err)
}
if string(vId) != srcVersionId {
srcBucket = filepath.Join(p.versioningDir, srcBucket)
srcObject = filepath.Join(genObjVersionKey(srcObject), srcVersionId)
}
}
_, err = os.Stat(dstBucket)
@@ -2697,6 +2826,9 @@ func (p *Posix) CopyObject(ctx context.Context, input *s3.CopyObjectInput) (*s3.
objPath := filepath.Join(srcBucket, srcObject)
f, err := os.Open(objPath)
if errors.Is(err, fs.ErrNotExist) {
if p.versioningEnabled() && vEnabled {
return nil, s3err.GetAPIError(s3err.ErrNoSuchVersion)
}
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if errors.Is(err, syscall.ENAMETOOLONG) {
@@ -2865,6 +2997,12 @@ func (p *Posix) fileToObj(bucket string) backend.GetObjFunc {
}, nil
}
// If the object is a delete marker, skip
isDel, _ := p.isObjDeleteMarker(bucket, path)
if isDel {
return s3response.Object{}, backend.ErrSkipObj
}
// file object, get object info and fill out object data
etagBytes, err := p.meta.RetrieveAttribute(bucket, path, etagkey)
if errors.Is(err, fs.ErrNotExist) {

View File

@@ -263,7 +263,7 @@ type ObjVersionFuncResult struct {
Truncated bool
}
type GetVersionsFunc func(path, versionIdMarker string, availableObjCount int, d fs.DirEntry) (*ObjVersionFuncResult, error)
type GetVersionsFunc func(path, versionIdMarker string, pastVersionIdMarker *bool, availableObjCount int, d fs.DirEntry) (*ObjVersionFuncResult, error)
// WalkVersions walks the supplied fs.FS and returns results compatible with
// ListObjectVersions action response
@@ -280,6 +280,8 @@ func WalkVersions(ctx context.Context, fileSystem fs.FS, prefix, delimiter, keyM
var nextVersionIdMarker string
var truncated bool
pastVersionIdMarker := versionIdMarker == ""
err := fs.WalkDir(fileSystem, ".", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
@@ -295,6 +297,15 @@ func WalkVersions(ctx context.Context, fileSystem fs.FS, prefix, delimiter, keyM
return fs.SkipDir
}
if !pastMarker {
if path == keyMarker {
pastMarker = true
}
if path < keyMarker {
return nil
}
}
if d.IsDir() {
// If prefix is defined and the directory does not match prefix,
// do not descend into the directory because nothing will
@@ -309,18 +320,23 @@ func WalkVersions(ctx context.Context, fileSystem fs.FS, prefix, delimiter, keyM
return fs.SkipDir
}
// skip directory objects, as they can't have versions
return nil
}
res, err := getObj(path, versionIdMarker, &pastVersionIdMarker, max-len(objects)-len(delMarkers)-len(cpmap), d)
if err == ErrSkipObj {
return nil
}
if err != nil {
return fmt.Errorf("directory to object %q: %w", path, err)
}
objects = append(objects, res.ObjectVersions...)
delMarkers = append(delMarkers, res.DelMarkers...)
if res.Truncated {
truncated = true
nextMarker = path
nextVersionIdMarker = res.NextVersionIdMarker
return fs.SkipAll
}
if !pastMarker {
if path == keyMarker {
pastMarker = true
return nil
}
if path < keyMarker {
return nil
}
return nil
}
// If object doesn't have prefix, don't include in results.
@@ -331,7 +347,7 @@ func WalkVersions(ctx context.Context, fileSystem fs.FS, prefix, delimiter, keyM
if delimiter == "" {
// If no delimiter specified, then all files with matching
// prefix are included in results
res, err := getObj(path, versionIdMarker, max-len(objects)-len(delMarkers)-len(cpmap), d)
res, err := getObj(path, versionIdMarker, &pastVersionIdMarker, max-len(objects)-len(delMarkers)-len(cpmap), d)
if err == ErrSkipObj {
return nil
}
@@ -374,7 +390,7 @@ func WalkVersions(ctx context.Context, fileSystem fs.FS, prefix, delimiter, keyM
suffix := strings.TrimPrefix(path, prefix)
before, _, found := strings.Cut(suffix, delimiter)
if !found {
res, err := getObj(path, versionIdMarker, max-len(objects)-len(delMarkers)-len(cpmap), d)
res, err := getObj(path, versionIdMarker, &pastVersionIdMarker, max-len(objects)-len(delMarkers)-len(cpmap), d)
if err == ErrSkipObj {
return nil
}

View File

@@ -22,20 +22,21 @@ import (
)
var (
awsID string
awsSecret string
endpoint string
prefix string
dstBucket string
partSize int64
objSize int64
concurrency int
files int
totalReqs int
upload bool
download bool
pathStyle bool
checksumDisable bool
awsID string
awsSecret string
endpoint string
prefix string
dstBucket string
partSize int64
objSize int64
concurrency int
files int
totalReqs int
upload bool
download bool
pathStyle bool
checksumDisable bool
versioningEnabled bool
)
func testCommand() *cli.Command {
@@ -87,6 +88,14 @@ func initTestCommands() []*cli.Command {
Usage: "Tests the full flow of gateway.",
Description: `Runs all the available tests to test the full flow of the gateway.`,
Action: getAction(integration.TestFullFlow),
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "versioning-enabled",
Usage: "Test the bucket object versioning, if the versioning is enabled",
Destination: &versioningEnabled,
Aliases: []string{"vs"},
},
},
},
{
Name: "posix",
@@ -276,6 +285,9 @@ func getAction(tf testFunc) func(*cli.Context) error {
if debug {
opts = append(opts, integration.WithDebug())
}
if versioningEnabled {
opts = append(opts, integration.WithVersioningEnabled())
}
s := integration.NewS3Conf(opts...)
tf(s)

2
go.mod
View File

@@ -16,6 +16,7 @@ require (
github.com/google/uuid v1.6.0
github.com/hashicorp/vault-client-go v0.4.3
github.com/nats-io/nats.go v1.37.0
github.com/oklog/ulid/v2 v2.1.0
github.com/pkg/xattr v0.4.10
github.com/segmentio/kafka-go v0.4.47
github.com/smira/go-statsd v1.3.3
@@ -45,7 +46,6 @@ require (
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/oklog/ulid/v2 v2.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect

View File

@@ -5,9 +5,11 @@ rm -rf /tmp/gw
mkdir /tmp/gw
rm -rf /tmp/covdata
mkdir /tmp/covdata
rm -rf /tmp/versioningdir
mkdir /tmp/versioningdir
# run server in background
GOCOVERDIR=/tmp/covdata ./versitygw -a user -s pass --iam-dir /tmp/gw posix /tmp/gw &
GOCOVERDIR=/tmp/covdata ./versitygw -a user -s pass --iam-dir /tmp/gw posix --versioning-dir /tmp/versioningdir /tmp/gw &
GW_PID=$!
# wait a second for server to start up
@@ -21,7 +23,7 @@ fi
# run tests
# full flow tests
if ! ./versitygw test -a user -s pass -e http://127.0.0.1:7070 full-flow; then
if ! ./versitygw test -a user -s pass -e http://127.0.0.1:7070 full-flow -vs; then
echo "full flow tests failed"
kill $GW_PID
exit 1

View File

@@ -495,6 +495,15 @@ func (c S3ApiController) GetActions(ctx *fiber.Ctx) error {
utils.SetMetaHeaders(ctx, res.Metadata)
// Set other response headers
utils.SetResponseHeaders(ctx, hdrs)
// Set version id header
if getstring(res.VersionId) != "" {
utils.SetResponseHeaders(ctx, []utils.CustomHeader{
{
Key: "x-amz-version-id",
Value: getstring(res.VersionId),
},
})
}
status := http.StatusOK
if acceptRange != "" {
@@ -2945,6 +2954,7 @@ func (c S3ApiController) HeadObject(ctx *fiber.Ctx) error {
Value: getstring(res.VersionId),
})
}
utils.SetResponseHeaders(ctx, headers)
return SendResponse(ctx, nil,

View File

@@ -133,6 +133,7 @@ const (
ErrInvalidMetadataDirective
ErrKeyTooLong
ErrInvalidVersionId
ErrNoSuchVersion
// Non-AWS errors
ErrExistingObjectIsDirectory
@@ -531,6 +532,11 @@ var errorCodeResponse = map[ErrorCode]APIError{
Description: "Your key is too long.",
HTTPStatusCode: http.StatusBadRequest,
},
ErrNoSuchVersion: {
Code: "NoSuchVersion",
Description: "The specified version does not exist.",
HTTPStatusCode: http.StatusNotFound,
},
// non aws errors
ErrExistingObjectIsDirectory: {

View File

@@ -469,6 +469,9 @@ func TestFullFlow(s *S3Conf) {
TestGetObjectLegalHold(s)
TestWORMProtection(s)
TestAccessControl(s)
if s.versioningEnabled {
TestVersioning(s)
}
}
func TestPosix(s *S3Conf) {
@@ -503,6 +506,34 @@ func TestAccessControl(s *S3Conf) {
AccessControl_copy_object_with_starting_slash_for_user(s)
}
func TestVersioning(s *S3Conf) {
PutBucketVersioning_non_existing_bucket(s)
PutBucketVersioning_invalid_status(s)
PutBucketVersioning_success(s)
GetBucketVersioning_non_existing_bucket(s)
GetBucketVersioning_success(s)
Versioning_PutObject_success(s)
Versioning_CopyObject_success(s)
Versioning_CopyObject_non_existing_version_id(s)
Versioning_CopyObject_from_an_object_version(s)
Versioning_HeadObject_invalid_versionId(s)
Versioning_HeadObject_success(s)
Versioning_HeadObject_delete_marker(s)
Versioning_GetObject_invalid_versionId(s)
Versioning_GetObject_success(s)
Versioning_GetObject_delete_marker(s)
Versioning_DeleteObject_delete_object_version(s)
Versioning_DeleteObject_delete_a_delete_marker(s)
Versioning_DeleteObjects_success(s)
Versioning_DeleteObjects_delete_deleteMarkers(s)
// ListObjectVersions
ListObjectVersions_non_existing_bucket(s)
ListObjectVersions_list_single_object_versions(s)
ListObjectVersions_list_multiple_object_versions(s)
ListObjectVersions_multiple_object_versions_truncated(s)
ListObjectVersions_with_delete_markers(s)
}
type IntTests map[string]func(s *S3Conf) error
func GetIntTests() IntTests {
@@ -812,5 +843,29 @@ func GetIntTests() IntTests {
"AccessControl_root_PutBucketAcl": AccessControl_root_PutBucketAcl,
"AccessControl_user_PutBucketAcl_with_policy_access": AccessControl_user_PutBucketAcl_with_policy_access,
"AccessControl_copy_object_with_starting_slash_for_user": AccessControl_copy_object_with_starting_slash_for_user,
"PutBucketVersioning_non_existing_bucket": PutBucketVersioning_non_existing_bucket,
"PutBucketVersioning_invalid_status": PutBucketVersioning_invalid_status,
"PutBucketVersioning_success": PutBucketVersioning_success,
"GetBucketVersioning_non_existing_bucket": GetBucketVersioning_non_existing_bucket,
"GetBucketVersioning_success": GetBucketVersioning_success,
"Versioning_PutObject_success": Versioning_PutObject_success,
"Versioning_CopyObject_success": Versioning_CopyObject_success,
"Versioning_CopyObject_non_existing_version_id": Versioning_CopyObject_non_existing_version_id,
"Versioning_CopyObject_from_an_object_version": Versioning_CopyObject_from_an_object_version,
"Versioning_HeadObject_invalid_versionId": Versioning_HeadObject_invalid_versionId,
"Versioning_HeadObject_success": Versioning_HeadObject_success,
"Versioning_HeadObject_delete_marker": Versioning_HeadObject_delete_marker,
"Versioning_GetObject_invalid_versionId": Versioning_GetObject_invalid_versionId,
"Versioning_GetObject_success": Versioning_GetObject_success,
"Versioning_GetObject_delete_marker": Versioning_GetObject_delete_marker,
"Versioning_DeleteObject_delete_object_version": Versioning_DeleteObject_delete_object_version,
"Versioning_DeleteObject_delete_a_delete_marker": Versioning_DeleteObject_delete_a_delete_marker,
"Versioning_DeleteObjects_success": Versioning_DeleteObjects_success,
"Versioning_DeleteObjects_delete_deleteMarkers": Versioning_DeleteObjects_delete_deleteMarkers,
"ListObjectVersions_non_existing_bucket": ListObjectVersions_non_existing_bucket,
"ListObjectVersions_list_single_object_versions": ListObjectVersions_list_single_object_versions,
"ListObjectVersions_list_multiple_object_versions": ListObjectVersions_list_multiple_object_versions,
"ListObjectVersions_multiple_object_versions_truncated": ListObjectVersions_multiple_object_versions_truncated,
"ListObjectVersions_with_delete_markers": ListObjectVersions_with_delete_markers,
}
}

View File

@@ -31,15 +31,16 @@ import (
)
type S3Conf struct {
awsID string
awsSecret string
awsRegion string
endpoint string
checksumDisable bool
pathStyle bool
PartSize int64
Concurrency int
debug bool
awsID string
awsSecret string
awsRegion string
endpoint string
checksumDisable bool
pathStyle bool
PartSize int64
Concurrency int
debug bool
versioningEnabled bool
}
func NewS3Conf(opts ...Option) *S3Conf {
@@ -80,6 +81,9 @@ func WithConcurrency(c int) Option {
func WithDebug() Option {
return func(s *S3Conf) { s.debug = true }
}
func WithVersioningEnabled() Option {
return func(s *S3Conf) { s.versioningEnabled = true }
}
func (c *S3Conf) getCreds() credentials.StaticCredentialsProvider {
// TODO support token/IAM

File diff suppressed because it is too large Load Diff

View File

@@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"io"
"math/big"
rnd "math/rand"
"net/http"
"net/url"
@@ -70,7 +71,25 @@ func setup(s *S3Conf, bucket string, opts ...setupOpt) error {
ObjectOwnership: cfg.Ownership,
})
cancel()
return err
if err != nil {
return err
}
if cfg.VersioningEnabled {
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err := s3client.PutBucketVersioning(ctx, &s3.PutBucketVersioningInput{
Bucket: &bucket,
VersioningConfiguration: &types.VersioningConfiguration{
Status: types.BucketVersioningStatusEnabled,
},
})
cancel()
if err != nil {
return err
}
}
return nil
}
func teardown(s *S3Conf, bucket string) error {
@@ -90,24 +109,31 @@ func teardown(s *S3Conf, bucket string) error {
return nil
}
in := &s3.ListObjectsV2Input{Bucket: &bucket}
in := &s3.ListObjectVersionsInput{Bucket: &bucket}
for {
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
out, err := s3client.ListObjectsV2(ctx, in)
out, err := s3client.ListObjectVersions(ctx, in)
cancel()
if err != nil {
return fmt.Errorf("failed to list objects: %w", err)
}
for _, item := range out.Contents {
err = deleteObject(&bucket, item.Key, nil)
for _, item := range out.Versions {
err = deleteObject(&bucket, item.Key, item.VersionId)
if err != nil {
return err
}
}
for _, item := range out.DeleteMarkers {
err = deleteObject(&bucket, item.Key, item.VersionId)
if err != nil {
return err
}
}
if out.IsTruncated != nil && *out.IsTruncated {
in.ContinuationToken = out.ContinuationToken
in.KeyMarker = out.KeyMarker
in.VersionIdMarker = out.NextVersionIdMarker
} else {
break
}
@@ -122,8 +148,9 @@ func teardown(s *S3Conf, bucket string) error {
}
type setupCfg struct {
LockEnabled bool
Ownership types.ObjectOwnership
LockEnabled bool
VersioningEnabled bool
Ownership types.ObjectOwnership
}
type setupOpt func(*setupCfg)
@@ -134,6 +161,9 @@ func withLock() setupOpt {
func withOwnership(o types.ObjectOwnership) setupOpt {
return func(s *setupCfg) { s.Ownership = o }
}
func withVersioning() setupOpt {
return func(s *setupCfg) { s.VersioningEnabled = true }
}
func actionHandler(s *S3Conf, testName string, handler func(s3client *s3.Client, bucket string) error, opts ...setupOpt) error {
runF(testName)
@@ -383,18 +413,31 @@ func contains(s []string, e string) bool {
return false
}
func putObjectWithData(lgth int64, input *s3.PutObjectInput, client *s3.Client) (csum [32]byte, data []byte, err error) {
data = make([]byte, lgth)
type putObjectOutput struct {
csum [32]byte
data []byte
res *s3.PutObjectOutput
}
func putObjectWithData(lgth int64, input *s3.PutObjectInput, client *s3.Client) (*putObjectOutput, error) {
data := make([]byte, lgth)
rand.Read(data)
csum = sha256.Sum256(data)
csum := sha256.Sum256(data)
r := bytes.NewReader(data)
input.Body = r
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err = client.PutObject(ctx, input)
res, err := client.PutObject(ctx, input)
cancel()
if err != nil {
return nil, err
}
return
return &putObjectOutput{
csum: csum,
data: data,
res: res,
}, nil
}
func createMp(s3client *s3.Client, bucket, key string) (*s3.CreateMultipartUploadOutput, error) {
@@ -592,21 +635,40 @@ func comparePrefixes(list1 []string, list2 []types.CommonPrefix) bool {
return true
}
func compareDelObjects(list1 []string, list2 []types.DeletedObject) bool {
func compareDelObjects(list1, list2 []types.DeletedObject) bool {
if len(list1) != len(list2) {
return false
}
elementMap := make(map[string]bool)
for _, elem := range list1 {
elementMap[elem] = true
}
for _, elem := range list2 {
if _, found := elementMap[*elem.Key]; !found {
for i, obj := range list1 {
if *obj.Key != *list2[i].Key {
return false
}
if obj.VersionId != nil {
if list2[i].VersionId == nil {
return false
}
if *obj.VersionId != *list2[i].VersionId {
return false
}
}
if obj.DeleteMarkerVersionId != nil {
if list2[i].DeleteMarkerVersionId == nil {
return false
}
if *obj.DeleteMarkerVersionId != *list2[i].DeleteMarkerVersionId {
return false
}
}
if obj.DeleteMarker != nil {
if list2[i].DeleteMarker == nil {
return false
}
if *obj.DeleteMarker != *list2[i].DeleteMarker {
return false
}
}
}
return true
@@ -860,3 +922,124 @@ func pfxStrings(pfxs []types.CommonPrefix) []string {
}
return pfxStrs
}
func createObjVersions(client *s3.Client, bucket, object string, count int) ([]types.ObjectVersion, error) {
versions := []types.ObjectVersion{}
for i := 0; i < count; i++ {
rNumber, err := rand.Int(rand.Reader, big.NewInt(100000))
dataLength := rNumber.Int64()
if err != nil {
return nil, err
}
r, err := putObjectWithData(dataLength, &s3.PutObjectInput{
Bucket: &bucket,
Key: &object,
}, client)
if err != nil {
return nil, err
}
isLatest := i == count-1
versions = append(versions, types.ObjectVersion{
ETag: r.res.ETag,
IsLatest: &isLatest,
Key: &object,
Size: &dataLength,
VersionId: r.res.VersionId,
})
}
versions = reverseSlice(versions)
return versions, nil
}
// ReverseSlice reverses a slice of any type
func reverseSlice[T any](s []T) []T {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
s[i], s[j] = s[j], s[i]
}
return s
}
func compareVersions(v1, v2 []types.ObjectVersion) bool {
if len(v1) != len(v2) {
return false
}
for i, version := range v1 {
if version.Key == nil || v2[i].Key == nil {
return false
}
if *version.Key != *v2[i].Key {
return false
}
if version.VersionId == nil || v2[i].VersionId == nil {
return false
}
if *version.VersionId != *v2[i].VersionId {
return false
}
if version.IsLatest == nil || v2[i].IsLatest == nil {
return false
}
if *version.IsLatest != *v2[i].IsLatest {
return false
}
if version.Size == nil || v2[i].Size == nil {
return false
}
if *version.Size != *v2[i].Size {
return false
}
if version.ETag == nil || v2[i].ETag == nil {
return false
}
if *version.ETag != *v2[i].ETag {
return false
}
}
return true
}
func compareDelMarkers(d1, d2 []types.DeleteMarkerEntry) bool {
if len(d1) != len(d2) {
return false
}
for i, dEntry := range d1 {
if dEntry.Key == nil || d2[i].Key == nil {
return false
}
if *dEntry.Key != *d2[i].Key {
return false
}
if dEntry.IsLatest == nil || d2[i].IsLatest == nil {
return false
}
if *dEntry.IsLatest != *d2[i].IsLatest {
return false
}
if dEntry.VersionId == nil || d2[i].VersionId == nil {
return false
}
if *dEntry.VersionId != *d2[i].VersionId {
return false
}
}
return true
}
func getBoolPtr(b bool) *bool {
return &b
}