diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 59a7b244e..5733a94db 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -1365,6 +1365,11 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string fvID := mustGetUUID() if markDelete { if opts.Versioned || opts.VersionSuspended { + if !deleteMarker { + // versioning suspended means we add `null` version as + // delete marker, if its not decided already. + deleteMarker = opts.VersionSuspended && opts.VersionID == "" + } fi := FileInfo{ Name: object, Deleted: deleteMarker, @@ -1381,10 +1386,10 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string fi.VersionID = opts.VersionID } } - // versioning suspended means we add `null` - // version as delete marker - // Add delete marker, since we don't have any version specified explicitly. - // Or if a particular version id needs to be replicated. + // versioning suspended means we add `null` version as + // delete marker. Add delete marker, since we don't have + // any version specified explicitly. Or if a particular + // version id needs to be replicated. if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, opts.DeleteMarker); err != nil { return objInfo, toObjectErr(err, bucket, object) } diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 359961188..5455c985a 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -647,6 +647,10 @@ func (z *erasureServerPools) MakeBucketWithLocation(ctx context.Context, bucket meta.ObjectLockConfigXML = enabledBucketObjectLockConfig } + if opts.VersioningEnabled { + meta.VersioningConfigXML = enabledBucketVersioningConfig + } + if err := meta.Save(context.Background(), z); err != nil { return toObjectErr(err, bucket) } diff --git a/cmd/object-api-listobjects_test.go b/cmd/object-api-listobjects_test.go index f8917e042..0feb27995 100644 --- a/cmd/object-api-listobjects_test.go +++ b/cmd/object-api-listobjects_test.go @@ -726,6 +726,85 @@ func objInfoNames(o []ObjectInfo) []string { return res } +func TestDeleteObjectVersionMarker(t *testing.T) { + ExecObjectLayerTest(t, testDeleteObjectVersion) +} + +func testDeleteObjectVersion(obj ObjectLayer, instanceType string, t1 TestErrHandler) { + if instanceType == FSTestStr { + return + } + + t, _ := t1.(*testing.T) + + testBuckets := []string{ + "bucket-suspended-version", + "bucket-suspended-version-id", + } + for _, bucket := range testBuckets { + err := obj.MakeBucketWithLocation(context.Background(), bucket, BucketOptions{ + VersioningEnabled: true, + }) + if err != nil { + t.Fatalf("%s : %s", instanceType, err) + } + meta, err := loadBucketMetadata(context.Background(), obj, bucket) + if err != nil { + t.Fatalf("%s : %s", instanceType, err) + } + meta.VersioningConfigXML = []byte(`Suspended`) + if err := meta.Save(context.Background(), obj); err != nil { + t.Fatalf("%s : %s", instanceType, err) + } + globalBucketMetadataSys.Set(bucket, meta) + globalNotificationSys.LoadBucketMetadata(context.Background(), bucket) + } + + testObjects := []struct { + parentBucket string + name string + content string + meta map[string]string + versionID string + expectDelMarker bool + }{ + {testBuckets[0], "delete-file", "contentstring", nil, "", true}, + {testBuckets[1], "delete-file", "contentstring", nil, "null", false}, + } + for _, object := range testObjects { + md5Bytes := md5.Sum([]byte(object.content)) + _, err := obj.PutObject(context.Background(), object.parentBucket, object.name, + mustGetPutObjReader(t, bytes.NewBufferString(object.content), + int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{ + Versioned: globalBucketVersioningSys.Enabled(object.parentBucket), + VersionSuspended: globalBucketVersioningSys.Suspended(object.parentBucket), + UserDefined: object.meta, + }) + if err != nil { + t.Fatalf("%s : %s", instanceType, err) + } + obj, err := obj.DeleteObject(context.Background(), object.parentBucket, object.name, ObjectOptions{ + Versioned: globalBucketVersioningSys.Enabled(object.parentBucket), + VersionSuspended: globalBucketVersioningSys.Suspended(object.parentBucket), + VersionID: object.versionID, + }) + if err != nil { + if object.versionID != "" { + if !isErrVersionNotFound(err) { + t.Fatalf("%s : %s", instanceType, err) + } + } else { + if !isErrObjectNotFound(err) { + t.Fatalf("%s : %s", instanceType, err) + } + } + } + if obj.DeleteMarker != object.expectDelMarker { + t.Fatalf("%s : expected deleted marker %t, found %t", instanceType, object.expectDelMarker, obj.DeleteMarker) + } + } +} + // Wrapper for calling ListObjectVersions tests for both Erasure multiple disks and single node setup. func TestListObjectVersions(t *testing.T) { ExecObjectLayerTest(t, testListObjectVersions) @@ -733,6 +812,10 @@ func TestListObjectVersions(t *testing.T) { // Unit test for ListObjectVersions func testListObjectVersions(obj ObjectLayer, instanceType string, t1 TestErrHandler) { + if instanceType == FSTestStr { + return + } + t, _ := t1.(*testing.T) testBuckets := []string{ // This bucket is used for testing ListObject operations. @@ -752,10 +835,6 @@ func testListObjectVersions(obj ObjectLayer, instanceType string, t1 TestErrHand for _, bucket := range testBuckets { err := obj.MakeBucketWithLocation(context.Background(), bucket, BucketOptions{VersioningEnabled: true}) if err != nil { - if _, ok := err.(NotImplemented); ok { - // Skip test for FS mode. - continue - } t.Fatalf("%s : %s", instanceType, err.Error()) } } @@ -791,10 +870,6 @@ func testListObjectVersions(obj ObjectLayer, instanceType string, t1 TestErrHand _, err = obj.PutObject(context.Background(), object.parentBucket, object.name, mustGetPutObjReader(t, bytes.NewBufferString(object.content), int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{UserDefined: object.meta}) if err != nil { - if _, ok := err.(BucketNotFound); ok { - // Skip test failure for FS mode. - continue - } t.Fatalf("%s : %s", instanceType, err.Error()) } @@ -1313,10 +1388,6 @@ func testListObjectVersions(obj ObjectLayer, instanceType string, t1 TestErrHand t.Run(fmt.Sprintf("%s-Test%d", instanceType, i+1), func(t *testing.T) { result, err := obj.ListObjectVersions(context.Background(), testCase.bucketName, testCase.prefix, testCase.marker, "", testCase.delimiter, int(testCase.maxKeys)) - if _, ok := err.(NotImplemented); ok { - // Not implemented should be skipped - t.Skip() - } if err != nil && testCase.shouldPass { t.Errorf("%s: Expected to pass, but failed with: %s", instanceType, err.Error()) } diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index 0674609c5..1ebf3d03c 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -1190,7 +1190,7 @@ func (x *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) { } return "", len(x.versions) == 0, err case ObjectType: - if updateVersion { + if updateVersion && !fi.Deleted { ver, err := x.getIdx(i) if err != nil { return "", false, err diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 86eb08ffc..15cabf726 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -954,11 +954,11 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F if err != errFileNotFound { return err } + metaDataPoolPut(buf) // Never used, return it if fi.Deleted && forceDelMarker { // Create a new xl.meta with a delete marker in it return s.WriteMetadata(ctx, volume, path, fi) } - metaDataPoolPut(buf) // Never used, return it buf, err = s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFileV1)) if err != nil { @@ -1016,6 +1016,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F } } } + if !lastVersion { buf, err = xlMeta.AppendTo(metaDataPoolGet()) defer metaDataPoolPut(buf)