Merge pull request #906 from versity/fix/suspended-vers-del-markers

fix: Fixes multiple null versionId delete markers creation with Delet…
This commit is contained in:
Ben McClelland
2024-10-21 13:53:59 -07:00
committed by GitHub
3 changed files with 93 additions and 9 deletions

View File

@@ -2435,10 +2435,21 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
acct = auth.Account{}
}
// Creates a new version in the versioning directory
_, err = p.createObjVersion(bucket, object, fi.Size(), acct)
if err != nil {
return nil, err
// Get object versionId
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) && !errors.Is(err, fs.ErrNotExist) {
return nil, fmt.Errorf("get obj versionId: %w", err)
}
if errors.Is(err, meta.ErrNoSuchKey) {
vId = []byte(nullVersionId)
}
// Creates a new object version in the versioning directory
if p.isBucketVersioningEnabled(vStatus) || string(vId) != nullVersionId {
_, err = p.createObjVersion(bucket, object, fi.Size(), acct)
if err != nil {
return nil, err
}
}
// Mark the object as a delete marker
@@ -2446,11 +2457,20 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
if err != nil {
return nil, fmt.Errorf("set delete marker: %w", err)
}
// Generate & set a unique versionId for the delete marker
versionId := ulid.Make().String()
err = p.meta.StoreAttribute(nil, bucket, object, versionIdKey, []byte(versionId))
if err != nil {
return nil, fmt.Errorf("set versionId: %w", err)
versionId := nullVersionId
if p.isBucketVersioningEnabled(vStatus) {
// Generate & set a unique versionId for the delete marker
versionId = ulid.Make().String()
err = p.meta.StoreAttribute(nil, bucket, object, versionIdKey, []byte(versionId))
if err != nil {
return nil, fmt.Errorf("set versionId: %w", err)
}
} else {
err = p.meta.DeleteAttribute(bucket, object, versionIdKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("delete versionId: %w", err)
}
}
return &s3.DeleteObjectOutput{

View File

@@ -572,6 +572,7 @@ func TestVersioning(s *S3Conf) {
Versioning_DeleteObject_non_existing_object(s)
Versioning_DeleteObject_delete_a_delete_marker(s)
Versioning_Delete_null_versionId_object(s)
Versioning_DeleteObject_suspended(s)
Versioning_DeleteObjects_success(s)
Versioning_DeleteObjects_delete_deleteMarkers(s)
// ListObjectVersions
@@ -959,6 +960,7 @@ func GetIntTests() IntTests {
"Versioning_DeleteObject_non_existing_object": Versioning_DeleteObject_non_existing_object,
"Versioning_DeleteObject_delete_a_delete_marker": Versioning_DeleteObject_delete_a_delete_marker,
"Versioning_Delete_null_versionId_object": Versioning_Delete_null_versionId_object,
"Versioning_DeleteObject_suspended": Versioning_DeleteObject_suspended,
"Versioning_DeleteObjects_success": Versioning_DeleteObjects_success,
"Versioning_DeleteObjects_delete_deleteMarkers": Versioning_DeleteObjects_delete_deleteMarkers,
"ListObjectVersions_non_existing_bucket": ListObjectVersions_non_existing_bucket,

View File

@@ -11688,6 +11688,68 @@ func Versioning_Delete_null_versionId_object(s *S3Conf) error {
})
}
func Versioning_DeleteObject_suspended(s *S3Conf) error {
testName := "Versioning_DeleteObject_suspended"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "my-obj"
versions, err := createObjVersions(s3client, bucket, obj, 1)
if err != nil {
return err
}
versions[0].IsLatest = getBoolPtr(false)
err = putBucketVersioningStatus(s3client, bucket, types.BucketVersioningStatusSuspended)
if err != nil {
return err
}
for i := 0; i < 5; i++ {
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
res, err := s3client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: &bucket,
Key: &obj,
})
cancel()
if err != nil {
return err
}
if !*res.DeleteMarker {
return fmt.Errorf("expected the delete marker to be true, instead got %v", *res.DeleteMarker)
}
if *res.VersionId != nullVersionId {
return fmt.Errorf("expected the versionId to be %v, instead got %v", nullVersionId, *res.VersionId)
}
}
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
res, err := s3client.ListObjectVersions(ctx, &s3.ListObjectVersionsInput{
Bucket: &bucket,
})
cancel()
if err != nil {
return err
}
delMarkers := []types.DeleteMarkerEntry{
{
IsLatest: getBoolPtr(true),
Key: &obj,
VersionId: &nullVersionId,
},
}
if !compareVersions(res.Versions, versions) {
return fmt.Errorf("expected the versions to be %v, instead got %v", versions, res.Versions)
}
if !compareDelMarkers(res.DeleteMarkers, delMarkers) {
return fmt.Errorf("expected the delete markers to be %v, instead got %v", delMarkers, res.DeleteMarkers)
}
return nil
}, withVersioning(types.BucketVersioningStatusEnabled))
}
func Versioning_DeleteObjects_success(s *S3Conf) error {
testName := "Versioning_DeleteObjects_success"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {