mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 05:41:29 +00:00
feat(s3/lifecycle): metadata-only delete when entry TtlSec > 0 (Phase 2b) (#9390)
* refactor(s3): thread metadataOnly into delete helpers Add a metadataOnly bool to deleteUnversionedObjectWithClient and deleteSpecificObjectVersion. When true the helper sends IsDeleteData= false to the filer's DeleteEntry RPC so per-chunk DeleteFile RPCs are skipped — the volume server reclaims chunks on its own at TTL drop. Non-lifecycle callers (DELETE handlers, batch delete) pass false to preserve today's eager-chunk-delete behavior; only the lifecycle handler in the next commit will pass true. * feat(s3/lifecycle): metadata-only delete when entry TtlSec > 0 Per-write TTL stamping (PR 9377) sets Attributes.TtlSec on every lifecycle-fitting entry. When the live entry the LifecycleDelete handler fetched carries TtlSec > 0 the volume server is guaranteed to reclaim chunks at TTL drop, so the filer can skip per-chunk DeleteFile RPCs and just remove the entry record. lifecycleDispatch now computes metadataOnly from the live entry and threads it through the unversioned, suspended-null, and noncurrent/expired-marker delete paths. createDeleteMarker is unaffected — it creates a marker, never deletes chunks.
This commit is contained in:
@@ -54,6 +54,12 @@ func (s3a *S3ApiServer) LifecycleDelete(ctx context.Context, req *s3_lifecycle_p
|
||||
}
|
||||
|
||||
func (s3a *S3ApiServer) lifecycleDispatch(ctx context.Context, req *s3_lifecycle_pb.LifecycleDeleteRequest, entry *filer_pb.Entry) (*s3_lifecycle_pb.LifecycleDeleteResponse, error) {
|
||||
// metadataOnly: skip per-chunk DeleteFile RPCs because the volume's TTL
|
||||
// will reclaim chunks on its own. Per-write TTL stamping (PR 9377) sets
|
||||
// Attributes.TtlSec on every entry whose lifecycle rule fits within
|
||||
// volume TTL — observing a non-zero TtlSec on the live entry is the
|
||||
// authoritative signal.
|
||||
metadataOnly := entry != nil && entry.Attributes != nil && entry.Attributes.TtlSec > 0
|
||||
switch req.ActionKind {
|
||||
case s3_lifecycle_pb.ActionKind_EXPIRATION_DAYS, s3_lifecycle_pb.ActionKind_EXPIRATION_DATE:
|
||||
// Current-version expiration: Enabled -> delete marker; Suspended
|
||||
@@ -74,7 +80,7 @@ func (s3a *S3ApiServer) lifecycleDispatch(ctx context.Context, req *s3_lifecycle
|
||||
return done(), nil
|
||||
case s3_constants.VersioningSuspended:
|
||||
// Best-effort null delete; NotFound is benign.
|
||||
if err := s3a.deleteSpecificObjectVersion(req.Bucket, req.ObjectPath, "null"); err != nil {
|
||||
if err := s3a.deleteSpecificObjectVersion(req.Bucket, req.ObjectPath, "null", metadataOnly); err != nil {
|
||||
if !errors.Is(err, filer_pb.ErrNotFound) && !errors.Is(err, ErrVersionNotFound) {
|
||||
return retryLater("TRANSPORT_ERROR: deleteNullVersion: " + err.Error()), nil
|
||||
}
|
||||
@@ -85,7 +91,7 @@ func (s3a *S3ApiServer) lifecycleDispatch(ctx context.Context, req *s3_lifecycle
|
||||
return done(), nil
|
||||
default:
|
||||
err := s3a.WithFilerClient(false, func(c filer_pb.SeaweedFilerClient) error {
|
||||
return s3a.deleteUnversionedObjectWithClient(c, req.Bucket, req.ObjectPath)
|
||||
return s3a.deleteUnversionedObjectWithClient(c, req.Bucket, req.ObjectPath, metadataOnly)
|
||||
})
|
||||
if err != nil {
|
||||
if errors.Is(err, filer_pb.ErrNotFound) || errors.Is(err, ErrObjectNotFound) {
|
||||
@@ -129,7 +135,7 @@ func (s3a *S3ApiServer) lifecycleDispatch(ctx context.Context, req *s3_lifecycle
|
||||
return outcome, err
|
||||
}
|
||||
}
|
||||
if err := s3a.deleteSpecificObjectVersion(req.Bucket, req.ObjectPath, req.VersionId); err != nil {
|
||||
if err := s3a.deleteSpecificObjectVersion(req.Bucket, req.ObjectPath, req.VersionId, metadataOnly); err != nil {
|
||||
if errors.Is(err, filer_pb.ErrNotFound) || errors.Is(err, ErrVersionNotFound) || errors.Is(err, ErrObjectNotFound) {
|
||||
return noopResolved("NOT_FOUND_AT_DELETE"), nil
|
||||
}
|
||||
|
||||
@@ -125,7 +125,7 @@ func (s3a *S3ApiServer) deleteVersionedObject(r *http.Request, bucket, object, v
|
||||
glog.V(2).Infof("deleteVersionedObject: object lock check failed for %s/%s version %s: %v", bucket, object, versionId, err)
|
||||
return result, s3err.ErrAccessDenied
|
||||
}
|
||||
if err := s3a.deleteSpecificObjectVersion(bucket, object, versionId); err != nil {
|
||||
if err := s3a.deleteSpecificObjectVersion(bucket, object, versionId, false); err != nil {
|
||||
glog.Errorf("deleteVersionedObject: failed to delete specific version %s for %s/%s: %v", versionId, bucket, object, err)
|
||||
return result, s3err.ErrInternalError
|
||||
}
|
||||
@@ -148,7 +148,7 @@ func (s3a *S3ApiServer) deleteVersionedObject(r *http.Request, bucket, object, v
|
||||
glog.V(2).Infof("deleteVersionedObject: object lock check failed for %s/%s null version: %v", bucket, object, err)
|
||||
return result, s3err.ErrAccessDenied
|
||||
}
|
||||
if err := s3a.deleteSpecificObjectVersion(bucket, object, "null"); err != nil {
|
||||
if err := s3a.deleteSpecificObjectVersion(bucket, object, "null", false); err != nil {
|
||||
glog.Errorf("deleteVersionedObject: failed to delete null version for %s/%s: %v", bucket, object, err)
|
||||
return result, s3err.ErrInternalError
|
||||
}
|
||||
@@ -166,10 +166,15 @@ func (s3a *S3ApiServer) deleteVersionedObject(r *http.Request, bucket, object, v
|
||||
return result, s3err.ErrInternalError
|
||||
}
|
||||
|
||||
func (s3a *S3ApiServer) deleteUnversionedObjectWithClient(client filer_pb.SeaweedFilerClient, bucket, object string) error {
|
||||
// deleteUnversionedObjectWithClient removes the bare object entry. When
|
||||
// metadataOnly is true the filer skips per-chunk DeleteFile RPCs and
|
||||
// relies on the volume's natural TTL to reclaim chunks; pass true only
|
||||
// when the entry's Attributes.TtlSec > 0 so the volume is guaranteed to
|
||||
// drop the chunks on its own.
|
||||
func (s3a *S3ApiServer) deleteUnversionedObjectWithClient(client filer_pb.SeaweedFilerClient, bucket, object string, metadataOnly bool) error {
|
||||
target := util.NewFullPath(s3a.bucketDir(bucket), object)
|
||||
dir, name := target.DirAndName()
|
||||
return deleteObjectEntry(client, dir, name, true, false)
|
||||
return deleteObjectEntry(client, dir, name, !metadataOnly, false)
|
||||
}
|
||||
|
||||
func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -228,7 +233,7 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
|
||||
}
|
||||
|
||||
if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
return s3a.deleteUnversionedObjectWithClient(client, bucket, object)
|
||||
return s3a.deleteUnversionedObjectWithClient(client, bucket, object, false)
|
||||
}); err != nil {
|
||||
glog.Errorf("DeleteObjectHandler: failed to delete %s/%s: %v", bucket, object, err)
|
||||
return s3err.ErrInternalError
|
||||
@@ -370,7 +375,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
|
||||
return s3err.ErrAccessDenied
|
||||
}
|
||||
|
||||
if err := s3a.deleteUnversionedObjectWithClient(client, bucket, object.Key); err != nil {
|
||||
if err := s3a.deleteUnversionedObjectWithClient(client, bucket, object.Key, false); err != nil {
|
||||
glog.Errorf("DeleteMultipleObjectsHandler: failed to delete %s/%s: %v", bucket, object.Key, err)
|
||||
return s3err.ErrInternalError
|
||||
}
|
||||
|
||||
66
weed/s3api/s3api_object_handlers_delete_test.go
Normal file
66
weed/s3api/s3api_object_handlers_delete_test.go
Normal file
@@ -0,0 +1,66 @@
|
||||
package s3api
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDeleteUnversionedObjectWithClient_MetadataOnlySkipsChunkDelete(t *testing.T) {
|
||||
// metadataOnly=true must reach the filer as IsDeleteData=false so the
|
||||
// volume server reclaims chunks via TTL instead of the filer enqueueing
|
||||
// per-chunk DeleteFile RPCs.
|
||||
s3a := &S3ApiServer{option: &S3ApiServerOption{BucketsPath: "/buckets"}}
|
||||
client := &deleteObjectEntryTestClient{}
|
||||
|
||||
err := s3a.deleteUnversionedObjectWithClient(client, "b", "k", true)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, client.deleteReq)
|
||||
assert.Equal(t, "/buckets/b", client.deleteReq.Directory)
|
||||
assert.Equal(t, "k", client.deleteReq.Name)
|
||||
assert.False(t, client.deleteReq.IsDeleteData, "metadataOnly must clear IsDeleteData")
|
||||
}
|
||||
|
||||
func TestDeleteUnversionedObjectWithClient_FullDeletePreservesIsDeleteData(t *testing.T) {
|
||||
// Default behavior (metadataOnly=false): filer should still enqueue
|
||||
// chunk deletions as before.
|
||||
s3a := &S3ApiServer{option: &S3ApiServerOption{BucketsPath: "/buckets"}}
|
||||
client := &deleteObjectEntryTestClient{}
|
||||
|
||||
err := s3a.deleteUnversionedObjectWithClient(client, "b", "k", false)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, client.deleteReq)
|
||||
assert.True(t, client.deleteReq.IsDeleteData, "default delete must keep IsDeleteData true")
|
||||
}
|
||||
|
||||
func TestDeleteUnversionedObjectWithClient_FullPathFromBucketsRoot(t *testing.T) {
|
||||
// Sanity: BucketsPath joins to <bucketsPath>/<bucket>/<object> in the
|
||||
// DeleteEntryRequest so the filer can locate the entry. Object keys
|
||||
// with multiple path segments should split into Directory + Name
|
||||
// correctly.
|
||||
s3a := &S3ApiServer{option: &S3ApiServerOption{BucketsPath: "/buckets"}}
|
||||
client := &deleteObjectEntryTestClient{}
|
||||
|
||||
err := s3a.deleteUnversionedObjectWithClient(client, "mybucket", "a/b/c.txt", false)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, client.deleteReq)
|
||||
assert.Equal(t, "/buckets/mybucket/a/b", client.deleteReq.Directory)
|
||||
assert.Equal(t, "c.txt", client.deleteReq.Name)
|
||||
}
|
||||
|
||||
func TestDeleteUnversionedObjectWithClient_PropagatesEntryAttributesIrrelevant(t *testing.T) {
|
||||
// The metadataOnly decision is the caller's responsibility (the
|
||||
// lifecycle handler). This function is dumb plumbing — it must not
|
||||
// inspect the entry itself, only translate the bool to IsDeleteData.
|
||||
s3a := &S3ApiServer{option: &S3ApiServerOption{BucketsPath: "/buckets"}}
|
||||
client := &deleteObjectEntryTestClient{
|
||||
// A response with no error is fine; attributes on a delete are unused.
|
||||
deleteResp: &filer_pb.DeleteEntryResponse{},
|
||||
}
|
||||
|
||||
require.NoError(t, s3a.deleteUnversionedObjectWithClient(client, "b", "k", true))
|
||||
require.NotNil(t, client.deleteReq)
|
||||
assert.False(t, client.deleteReq.IsDeleteData)
|
||||
}
|
||||
@@ -964,8 +964,11 @@ func (s3a *S3ApiServer) getSpecificObjectVersion(bucket, object, versionId strin
|
||||
return entry, nil
|
||||
}
|
||||
|
||||
// deleteSpecificObjectVersion deletes a specific version of an object
|
||||
func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId string) error {
|
||||
// deleteSpecificObjectVersion deletes a specific version of an object.
|
||||
// metadataOnly=true skips per-chunk DeleteFile RPCs at the filer; only
|
||||
// pass true when the live entry's Attributes.TtlSec > 0 so the volume
|
||||
// reclaims chunks on its own.
|
||||
func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId string, metadataOnly bool) error {
|
||||
// Normalize object path to ensure consistency with toFilerPath behavior
|
||||
normalizedObject := s3_constants.NormalizeObjectKey(object)
|
||||
|
||||
@@ -986,7 +989,7 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st
|
||||
}
|
||||
|
||||
// Delete the regular file
|
||||
deleteErr := s3a.rmObject(bucketDir, normalizedObject, true, false)
|
||||
deleteErr := s3a.rmObject(bucketDir, normalizedObject, !metadataOnly, false)
|
||||
if deleteErr != nil {
|
||||
// Check if file was already deleted by another process
|
||||
if _, checkErr := s3a.getEntry(bucketDir, normalizedObject); checkErr != nil {
|
||||
@@ -1013,7 +1016,7 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st
|
||||
// Attempt to delete the version file
|
||||
// Note: We don't check if the file exists first to avoid race conditions
|
||||
// The deletion operation should be idempotent
|
||||
deleteErr := s3a.rm(versionsDir, versionFile, true, false)
|
||||
deleteErr := s3a.rm(versionsDir, versionFile, !metadataOnly, false)
|
||||
if deleteErr != nil {
|
||||
// Check if file was already deleted by another process (race condition handling)
|
||||
if _, checkErr := s3a.getEntry(versionsDir, versionFile); checkErr != nil {
|
||||
|
||||
Reference in New Issue
Block a user