mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 05:41:29 +00:00
refactor(s3api): strip back-stamp from PutBucketLifecycleConfiguration (Phase 4) (#9367)
* refactor(s3api): strip back-stamp from PutBucketLifecycleConfiguration The handler used to walk every existing entry under the rule's prefix and stamp entry.Attributes.TtlSec + the SeaweedFSExpiresS3 flag so that the filer's compaction filter would expire them. With the event-driven lifecycle worker live, that retroactive walk is redundant — the worker drives expiration off the meta-log and a one-time bootstrap scan, so a PUT lifecycle stays O(rules) instead of O(objects). New writes still inherit TTL from the filer.conf location entry above; that volume-routing path is unchanged here and will move to an explicit operator command later (Phase 11). Drops updateEntriesTTL + processDirectoryTTL + processTTLBatch + updateEntryTTL from filer_util.go. * fix(s3api): clear stale lifecycle TTL entries on PUT PutBucketLifecycleConfiguration only ever appended/updated filer.conf entries — it never cleared ones the operator removed, renamed-prefix on, disabled, retagged with a tag filter, or bucket-versioned out of the fast path. The stale day-TTL kept routing new writes (and would expire old ones if any landed under the prefix) after the policy was updated. Treat PUT as a full replacement: walk this bucket's existing day-TTL entries, clear them, then add fresh entries from the new rule set. * test(command): bump mini default plugin job-type count to 7 The s3_lifecycle plugin handler registered in #9362 is the seventh default; the test still asserted six. * fix(s3api): delete stale lifecycle PathConf instead of blanking Ttl Just clearing pathConf.Ttl leaves the rule's Collection, Replication, and VolumeGrowthCount in place, so new writes still match the stale prefix and inherit outdated routing/placement. Use fc.DeleteLocationConf so the lifecycle-owned PathConf goes away entirely. Same fix in DeleteBucketLifecycleHandler, which had the same bug.
This commit is contained in:
@@ -15,7 +15,7 @@ func TestMiniDefaultPluginJobTypes(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("buildPluginWorkerHandlers(mini default) err = %v", err)
|
||||
}
|
||||
if len(handlers) != 6 {
|
||||
t.Fatalf("expected mini default job types to include 6 handlers, got %d", len(handlers))
|
||||
if len(handlers) != 7 {
|
||||
t.Fatalf("expected mini default job types to include 7 handlers, got %d", len(handlers))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -188,110 +188,6 @@ func (s3a *S3ApiServer) updateEntry(parentDirectoryPath string, newEntry *filer_
|
||||
return err
|
||||
}
|
||||
|
||||
func (s3a *S3ApiServer) updateEntriesTTL(parentDirectoryPath string, ttlSec int32) error {
|
||||
// Use iterative approach with a queue to avoid recursive WithFilerClient calls
|
||||
// which would create a new connection for each subdirectory
|
||||
return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
ctx := context.Background()
|
||||
var updateErrors []error
|
||||
dirsToProcess := []string{parentDirectoryPath}
|
||||
|
||||
for len(dirsToProcess) > 0 {
|
||||
dir := dirsToProcess[0]
|
||||
dirsToProcess = dirsToProcess[1:]
|
||||
|
||||
// Process directory in paginated batches
|
||||
if err := s3a.processDirectoryTTL(ctx, client, dir, ttlSec, &dirsToProcess, &updateErrors); err != nil {
|
||||
updateErrors = append(updateErrors, err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(updateErrors) > 0 {
|
||||
return errors.Join(updateErrors...)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// processDirectoryTTL processes a single directory in paginated batches
|
||||
func (s3a *S3ApiServer) processDirectoryTTL(ctx context.Context, client filer_pb.SeaweedFilerClient,
|
||||
dir string, ttlSec int32, dirsToProcess *[]string, updateErrors *[]error) error {
|
||||
|
||||
const batchSize = filer.PaginationSize
|
||||
startFrom := ""
|
||||
|
||||
for {
|
||||
lastEntryName, entryCount, err := s3a.processTTLBatch(ctx, client, dir, ttlSec, startFrom, batchSize, dirsToProcess, updateErrors)
|
||||
if err != nil {
|
||||
return fmt.Errorf("list entries in %s: %w", dir, err)
|
||||
}
|
||||
|
||||
// If we got fewer entries than batch size, we've reached the end
|
||||
if entryCount < batchSize {
|
||||
break
|
||||
}
|
||||
startFrom = lastEntryName
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// processTTLBatch processes a single batch of entries
|
||||
func (s3a *S3ApiServer) processTTLBatch(ctx context.Context, client filer_pb.SeaweedFilerClient,
|
||||
dir string, ttlSec int32, startFrom string, batchSize uint32,
|
||||
dirsToProcess *[]string, updateErrors *[]error) (lastEntry string, count int, err error) {
|
||||
|
||||
err = filer_pb.SeaweedList(ctx, client, dir, "", func(entry *filer_pb.Entry, isLast bool) error {
|
||||
lastEntry = entry.Name
|
||||
count++
|
||||
|
||||
if entry.IsDirectory {
|
||||
*dirsToProcess = append(*dirsToProcess, string(util.NewFullPath(dir, entry.Name)))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update entry TTL and S3 expiry flag
|
||||
if updateErr := s3a.updateEntryTTL(ctx, client, dir, entry, ttlSec); updateErr != nil {
|
||||
*updateErrors = append(*updateErrors, updateErr)
|
||||
}
|
||||
return nil
|
||||
}, startFrom, false, batchSize)
|
||||
|
||||
return lastEntry, count, err
|
||||
}
|
||||
|
||||
// updateEntryTTL updates a single entry's TTL and S3 expiry flag
|
||||
func (s3a *S3ApiServer) updateEntryTTL(ctx context.Context, client filer_pb.SeaweedFilerClient,
|
||||
dir string, entry *filer_pb.Entry, ttlSec int32) error {
|
||||
|
||||
if entry.Attributes == nil {
|
||||
entry.Attributes = &filer_pb.FuseAttributes{}
|
||||
}
|
||||
if entry.Extended == nil {
|
||||
entry.Extended = make(map[string][]byte)
|
||||
}
|
||||
|
||||
// Check if both TTL and S3 expiry flag are already set correctly
|
||||
flagAlreadySet := string(entry.Extended[s3_constants.SeaweedFSExpiresS3]) == "true"
|
||||
if entry.Attributes.TtlSec == ttlSec && flagAlreadySet {
|
||||
return nil // Already up to date
|
||||
}
|
||||
|
||||
// Set the S3 expiry flag
|
||||
entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true")
|
||||
// Update TTL if needed
|
||||
if entry.Attributes.TtlSec != ttlSec {
|
||||
entry.Attributes.TtlSec = ttlSec
|
||||
}
|
||||
|
||||
if err := filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{
|
||||
Directory: dir,
|
||||
Entry: entry,
|
||||
}); err != nil {
|
||||
return fmt.Errorf("file %s/%s: %w", dir, entry.Name, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s3a *S3ApiServer) getCollectionName(bucket string) string {
|
||||
if s3a.option.FilerGroup != "" {
|
||||
return fmt.Sprintf("%s_%s", s3a.option.FilerGroup, bucket)
|
||||
|
||||
@@ -14,8 +14,6 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
|
||||
"github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket"
|
||||
|
||||
@@ -956,6 +954,24 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr
|
||||
collectionTtls := fc.GetCollectionTtls(collectionName)
|
||||
changed := false
|
||||
|
||||
// PUT replaces the entire lifecycle policy, so any day-TTL filer.conf
|
||||
// entry left over from a previous PUT (rule removed, prefix changed,
|
||||
// rule disabled, gained a tag/size filter, bucket switched to
|
||||
// versioned) must be removed first — otherwise a stale entry keeps
|
||||
// routing new writes to the old collection/replication and the volume
|
||||
// server keeps expiring objects under the prior TTL.
|
||||
bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)
|
||||
for prefix, ttl := range collectionTtls {
|
||||
if !strings.HasPrefix(prefix, bucketPrefix) || !strings.HasSuffix(ttl, "d") {
|
||||
continue
|
||||
}
|
||||
fc.DeleteLocationConf(prefix)
|
||||
changed = true
|
||||
}
|
||||
// Re-read after removing so the per-rule dedupe below sees the freshly
|
||||
// cleared state, not the snapshot from before the loop above.
|
||||
collectionTtls = fc.GetCollectionTtls(collectionName)
|
||||
|
||||
// Check whether the bucket has versioning enabled. Versioned buckets must
|
||||
// NOT use the TTL fast-path because:
|
||||
// 1. TTL volumes expire as a unit, destroying all data — including
|
||||
@@ -1035,13 +1051,10 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
}
|
||||
ttlSec := int32((time.Duration(rule.Expiration.Days) * util.LifeCycleInterval).Seconds())
|
||||
glog.V(2).Infof("Start updating TTL for %s", locationPrefix)
|
||||
if updErr := s3a.updateEntriesTTL(locationPrefix, ttlSec); updErr != nil {
|
||||
glog.Errorf("PutBucketLifecycleConfigurationHandler update TTL for %s: %s", locationPrefix, updErr)
|
||||
} else {
|
||||
glog.V(2).Infof("Finished updating TTL for %s", locationPrefix)
|
||||
}
|
||||
// Existing entries are not back-stamped here; the lifecycle worker
|
||||
// drives expiration off the meta-log and bootstrap walk so a PUT
|
||||
// stays O(rules) instead of O(objects). New writes inherit TTL from
|
||||
// the filer.conf entry above.
|
||||
changed = true
|
||||
}
|
||||
|
||||
@@ -1088,16 +1101,13 @@ func (s3a *S3ApiServer) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *h
|
||||
}
|
||||
collectionTtls := fc.GetCollectionTtls(s3a.getCollectionName(bucket))
|
||||
changed := false
|
||||
bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)
|
||||
for prefix, ttl := range collectionTtls {
|
||||
bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)
|
||||
if strings.HasPrefix(prefix, bucketPrefix) && strings.HasSuffix(ttl, "d") {
|
||||
pathConf, found := fc.GetLocationConf(prefix)
|
||||
if found {
|
||||
pathConf.Ttl = ""
|
||||
fc.SetLocationConf(pathConf)
|
||||
}
|
||||
changed = true
|
||||
if !strings.HasPrefix(prefix, bucketPrefix) || !strings.HasSuffix(ttl, "d") {
|
||||
continue
|
||||
}
|
||||
fc.DeleteLocationConf(prefix)
|
||||
changed = true
|
||||
}
|
||||
|
||||
if changed {
|
||||
|
||||
Reference in New Issue
Block a user