refactor(s3api): strip back-stamp from PutBucketLifecycleConfiguration (Phase 4) (#9367)

* refactor(s3api): strip back-stamp from PutBucketLifecycleConfiguration

The handler used to walk every existing entry under the rule's prefix
and stamp entry.Attributes.TtlSec + the SeaweedFSExpiresS3 flag so that
the filer's compaction filter would expire them. With the event-driven
lifecycle worker live, that retroactive walk is redundant — the worker
drives expiration off the meta-log and a one-time bootstrap scan, so a
PUT lifecycle stays O(rules) instead of O(objects).

New writes still inherit TTL from the filer.conf location entry above;
that volume-routing path is unchanged here and will move to an explicit
operator command later (Phase 11).

Drops updateEntriesTTL + processDirectoryTTL + processTTLBatch +
updateEntryTTL from filer_util.go.

* fix(s3api): clear stale lifecycle TTL entries on PUT

PutBucketLifecycleConfiguration only ever appended/updated filer.conf
entries — it never cleared ones the operator removed, renamed-prefix on,
disabled, retagged with a tag filter, or bucket-versioned out of the
fast path. The stale day-TTL kept routing new writes (and would expire
old ones if any landed under the prefix) after the policy was updated.

Treat PUT as a full replacement: walk this bucket's existing day-TTL
entries, clear them, then add fresh entries from the new rule set.

* test(command): bump mini default plugin job-type count to 7

The s3_lifecycle plugin handler registered in #9362 is the seventh
default; the test still asserted six.

* fix(s3api): delete stale lifecycle PathConf instead of blanking Ttl

Just clearing pathConf.Ttl leaves the rule's Collection, Replication,
and VolumeGrowthCount in place, so new writes still match the stale
prefix and inherit outdated routing/placement. Use
fc.DeleteLocationConf so the lifecycle-owned PathConf goes away
entirely. Same fix in DeleteBucketLifecycleHandler, which had the
same bug.
This commit is contained in:
Chris Lu
2026-05-08 11:03:03 -07:00
committed by GitHub
parent 5d43f84df7
commit 8b87ceb0d1
3 changed files with 29 additions and 123 deletions

View File

@@ -15,7 +15,7 @@ func TestMiniDefaultPluginJobTypes(t *testing.T) {
if err != nil {
t.Fatalf("buildPluginWorkerHandlers(mini default) err = %v", err)
}
if len(handlers) != 6 {
t.Fatalf("expected mini default job types to include 6 handlers, got %d", len(handlers))
if len(handlers) != 7 {
t.Fatalf("expected mini default job types to include 7 handlers, got %d", len(handlers))
}
}

View File

@@ -188,110 +188,6 @@ func (s3a *S3ApiServer) updateEntry(parentDirectoryPath string, newEntry *filer_
return err
}
func (s3a *S3ApiServer) updateEntriesTTL(parentDirectoryPath string, ttlSec int32) error {
// Use iterative approach with a queue to avoid recursive WithFilerClient calls
// which would create a new connection for each subdirectory
return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
ctx := context.Background()
var updateErrors []error
dirsToProcess := []string{parentDirectoryPath}
for len(dirsToProcess) > 0 {
dir := dirsToProcess[0]
dirsToProcess = dirsToProcess[1:]
// Process directory in paginated batches
if err := s3a.processDirectoryTTL(ctx, client, dir, ttlSec, &dirsToProcess, &updateErrors); err != nil {
updateErrors = append(updateErrors, err)
}
}
if len(updateErrors) > 0 {
return errors.Join(updateErrors...)
}
return nil
})
}
// processDirectoryTTL processes a single directory in paginated batches
func (s3a *S3ApiServer) processDirectoryTTL(ctx context.Context, client filer_pb.SeaweedFilerClient,
dir string, ttlSec int32, dirsToProcess *[]string, updateErrors *[]error) error {
const batchSize = filer.PaginationSize
startFrom := ""
for {
lastEntryName, entryCount, err := s3a.processTTLBatch(ctx, client, dir, ttlSec, startFrom, batchSize, dirsToProcess, updateErrors)
if err != nil {
return fmt.Errorf("list entries in %s: %w", dir, err)
}
// If we got fewer entries than batch size, we've reached the end
if entryCount < batchSize {
break
}
startFrom = lastEntryName
}
return nil
}
// processTTLBatch processes a single batch of entries
func (s3a *S3ApiServer) processTTLBatch(ctx context.Context, client filer_pb.SeaweedFilerClient,
dir string, ttlSec int32, startFrom string, batchSize uint32,
dirsToProcess *[]string, updateErrors *[]error) (lastEntry string, count int, err error) {
err = filer_pb.SeaweedList(ctx, client, dir, "", func(entry *filer_pb.Entry, isLast bool) error {
lastEntry = entry.Name
count++
if entry.IsDirectory {
*dirsToProcess = append(*dirsToProcess, string(util.NewFullPath(dir, entry.Name)))
return nil
}
// Update entry TTL and S3 expiry flag
if updateErr := s3a.updateEntryTTL(ctx, client, dir, entry, ttlSec); updateErr != nil {
*updateErrors = append(*updateErrors, updateErr)
}
return nil
}, startFrom, false, batchSize)
return lastEntry, count, err
}
// updateEntryTTL updates a single entry's TTL and S3 expiry flag
func (s3a *S3ApiServer) updateEntryTTL(ctx context.Context, client filer_pb.SeaweedFilerClient,
dir string, entry *filer_pb.Entry, ttlSec int32) error {
if entry.Attributes == nil {
entry.Attributes = &filer_pb.FuseAttributes{}
}
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
// Check if both TTL and S3 expiry flag are already set correctly
flagAlreadySet := string(entry.Extended[s3_constants.SeaweedFSExpiresS3]) == "true"
if entry.Attributes.TtlSec == ttlSec && flagAlreadySet {
return nil // Already up to date
}
// Set the S3 expiry flag
entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true")
// Update TTL if needed
if entry.Attributes.TtlSec != ttlSec {
entry.Attributes.TtlSec = ttlSec
}
if err := filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{
Directory: dir,
Entry: entry,
}); err != nil {
return fmt.Errorf("file %s/%s: %w", dir, entry.Name, err)
}
return nil
}
func (s3a *S3ApiServer) getCollectionName(bucket string) string {
if s3a.option.FilerGroup != "" {
return fmt.Sprintf("%s_%s", s3a.option.FilerGroup, bucket)

View File

@@ -14,8 +14,6 @@ import (
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket"
@@ -956,6 +954,24 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr
collectionTtls := fc.GetCollectionTtls(collectionName)
changed := false
// PUT replaces the entire lifecycle policy, so any day-TTL filer.conf
// entry left over from a previous PUT (rule removed, prefix changed,
// rule disabled, gained a tag/size filter, bucket switched to
// versioned) must be removed first — otherwise a stale entry keeps
// routing new writes to the old collection/replication and the volume
// server keeps expiring objects under the prior TTL.
bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)
for prefix, ttl := range collectionTtls {
if !strings.HasPrefix(prefix, bucketPrefix) || !strings.HasSuffix(ttl, "d") {
continue
}
fc.DeleteLocationConf(prefix)
changed = true
}
// Re-read after removing so the per-rule dedupe below sees the freshly
// cleared state, not the snapshot from before the loop above.
collectionTtls = fc.GetCollectionTtls(collectionName)
// Check whether the bucket has versioning enabled. Versioned buckets must
// NOT use the TTL fast-path because:
// 1. TTL volumes expire as a unit, destroying all data — including
@@ -1035,13 +1051,10 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
ttlSec := int32((time.Duration(rule.Expiration.Days) * util.LifeCycleInterval).Seconds())
glog.V(2).Infof("Start updating TTL for %s", locationPrefix)
if updErr := s3a.updateEntriesTTL(locationPrefix, ttlSec); updErr != nil {
glog.Errorf("PutBucketLifecycleConfigurationHandler update TTL for %s: %s", locationPrefix, updErr)
} else {
glog.V(2).Infof("Finished updating TTL for %s", locationPrefix)
}
// Existing entries are not back-stamped here; the lifecycle worker
// drives expiration off the meta-log and bootstrap walk so a PUT
// stays O(rules) instead of O(objects). New writes inherit TTL from
// the filer.conf entry above.
changed = true
}
@@ -1088,16 +1101,13 @@ func (s3a *S3ApiServer) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *h
}
collectionTtls := fc.GetCollectionTtls(s3a.getCollectionName(bucket))
changed := false
bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)
for prefix, ttl := range collectionTtls {
bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)
if strings.HasPrefix(prefix, bucketPrefix) && strings.HasSuffix(ttl, "d") {
pathConf, found := fc.GetLocationConf(prefix)
if found {
pathConf.Ttl = ""
fc.SetLocationConf(pathConf)
}
changed = true
if !strings.HasPrefix(prefix, bucketPrefix) || !strings.HasSuffix(ttl, "d") {
continue
}
fc.DeleteLocationConf(prefix)
changed = true
}
if changed {