simplify and optimize deleting multiple versions of object (#2153)

This commit is contained in:
Harshavardhana
2022-06-28 20:25:50 -07:00
committed by GitHub
parent b518810106
commit ff93109b57
5 changed files with 95 additions and 124 deletions

View File

@@ -658,79 +658,57 @@ func getDeleteMultiplePathsResponse(session *models.Principal, params objectApi.
func deleteObjects(ctx context.Context, client MCClient, bucket string, path string, versionID string, recursive bool, allVersions bool, nonCurrentVersionsOnly bool) error {
// Delete All non-Current versions only.
if nonCurrentVersionsOnly {
if err := deleteNonCurrentVersions(ctx, client, bucket, path); err != nil {
return err
}
return nil
return deleteNonCurrentVersions(ctx, client, bucket, path)
}
if allVersions {
if err := deleteMultipleObjects(ctx, client, recursive, true); err != nil {
return err
}
}
if recursive {
if err := deleteMultipleObjects(ctx, client, recursive, false); err != nil {
return err
}
} else {
if err := deleteSingleObject(ctx, client, bucket, path, versionID); err != nil {
return err
}
return deleteMultipleObjects(ctx, client, recursive, allVersions)
}
return nil
return deleteSingleObject(ctx, client, bucket, path, versionID)
}
// deleteMultipleObjects uses listing before removal, it can list recursively or not,
// Use cases:
// * Remove objects recursively
func deleteMultipleObjects(ctx context.Context, client MCClient, recursive bool, allVersions bool) error {
isRemoveBucket := false
isIncomplete := false
isBypass := false
listOpts := mc.ListOptions{Recursive: recursive, Incomplete: isIncomplete, ShowDir: mc.DirNone, WithOlderVersions: allVersions, WithDeleteMarkers: allVersions}
// TODO: support older Versions
contentCh := make(chan *mc.ClientContent, 1)
isIncomplete := false
isRemoveBucket := false
listOpts := mc.ListOptions{
Recursive: recursive,
Incomplete: isIncomplete,
ShowDir: mc.DirNone,
WithOlderVersions: allVersions,
WithDeleteMarkers: allVersions,
}
resultCh := client.remove(ctx, isIncomplete, isRemoveBucket, isBypass, contentCh)
OUTER_LOOP:
for content := range client.list(ctx, listOpts) {
if content.Err != nil {
if _, ok := content.Err.ToGoError().(mc.PathInsufficientPermission); ok {
// Ignore Permission errors.
lctx, cancel := context.WithCancel(ctx)
defer cancel()
contentCh := make(chan *mc.ClientContent)
go func() {
defer close(contentCh)
for content := range client.list(lctx, listOpts) {
if content.Err != nil {
continue
}
close(contentCh)
return content.Err.Cause
}
sent := false
for !sent {
select {
case contentCh <- content:
sent = true
case result := <-resultCh:
if result.Err != nil {
if _, ok := result.Err.ToGoError().(mc.PathInsufficientPermission); ok {
// Ignore Permission errors.
continue
}
close(contentCh)
return result.Err.Cause
}
break OUTER_LOOP
case <-lctx.Done():
return
}
}
}
close(contentCh)
for result := range resultCh {
}()
for result := range client.remove(ctx, isIncomplete, isRemoveBucket, isBypass, contentCh) {
if result.Err != nil {
if _, ok := result.Err.ToGoError().(mc.PathInsufficientPermission); ok {
// Ignore Permission errors.
continue
}
return result.Err.Cause
}
}
return nil
}
@@ -740,17 +718,13 @@ func deleteSingleObject(ctx context.Context, client MCClient, bucket, object str
contentCh <- &mc.ClientContent{URL: *newClientURL(targetURL), VersionID: versionID}
close(contentCh)
isRemoveBucket := false
isIncomplete := false
isBypass := false
isIncomplete := false
isRemoveBucket := false
resultCh := client.remove(ctx, isIncomplete, isRemoveBucket, isBypass, contentCh)
for result := range resultCh {
if result.Err != nil {
if _, ok := result.Err.ToGoError().(mc.PathInsufficientPermission); ok {
// Ignore Permission errors.
continue
}
return result.Err.Cause
}
}
@@ -758,17 +732,40 @@ func deleteSingleObject(ctx context.Context, client MCClient, bucket, object str
}
func deleteNonCurrentVersions(ctx context.Context, client MCClient, bucket, path string) error {
// Get current object versions
for lsObj := range client.list(ctx, mc.ListOptions{WithDeleteMarkers: true, WithOlderVersions: true, Recursive: true}) {
if lsObj.Err != nil {
return errors.New(lsObj.Err.String())
}
lctx, cancel := context.WithCancel(ctx)
defer cancel()
if !lsObj.IsLatest {
err := deleteSingleObject(ctx, client, bucket, path, lsObj.VersionID)
if err != nil {
return err
contentCh := make(chan *mc.ClientContent)
go func() {
defer close(contentCh)
// Get current object versions
for lsObj := range client.list(lctx, mc.ListOptions{
WithDeleteMarkers: true,
WithOlderVersions: true,
Recursive: true,
}) {
if lsObj.Err != nil {
continue
}
if lsObj.IsLatest {
continue
}
// All non-current objects proceed to purge.
select {
case contentCh <- lsObj:
case <-lctx.Done():
return
}
}
}()
for result := range client.remove(ctx, false, false, false, contentCh) {
if result.Err != nil {
return result.Err.Cause
}
}