diff --git a/weed/command/mini_plugin_test.go b/weed/command/mini_plugin_test.go index 36e927fa2..f302e01af 100644 --- a/weed/command/mini_plugin_test.go +++ b/weed/command/mini_plugin_test.go @@ -15,7 +15,7 @@ func TestMiniDefaultPluginJobTypes(t *testing.T) { if err != nil { t.Fatalf("buildPluginWorkerHandlers(mini default) err = %v", err) } - if len(handlers) != 6 { - t.Fatalf("expected mini default job types to include 6 handlers, got %d", len(handlers)) + if len(handlers) != 7 { + t.Fatalf("expected mini default job types to include 7 handlers, got %d", len(handlers)) } } diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go index ade3992fc..5b22434c3 100644 --- a/weed/s3api/filer_util.go +++ b/weed/s3api/filer_util.go @@ -188,110 +188,6 @@ func (s3a *S3ApiServer) updateEntry(parentDirectoryPath string, newEntry *filer_ return err } -func (s3a *S3ApiServer) updateEntriesTTL(parentDirectoryPath string, ttlSec int32) error { - // Use iterative approach with a queue to avoid recursive WithFilerClient calls - // which would create a new connection for each subdirectory - return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - ctx := context.Background() - var updateErrors []error - dirsToProcess := []string{parentDirectoryPath} - - for len(dirsToProcess) > 0 { - dir := dirsToProcess[0] - dirsToProcess = dirsToProcess[1:] - - // Process directory in paginated batches - if err := s3a.processDirectoryTTL(ctx, client, dir, ttlSec, &dirsToProcess, &updateErrors); err != nil { - updateErrors = append(updateErrors, err) - } - } - - if len(updateErrors) > 0 { - return errors.Join(updateErrors...) - } - return nil - }) -} - -// processDirectoryTTL processes a single directory in paginated batches -func (s3a *S3ApiServer) processDirectoryTTL(ctx context.Context, client filer_pb.SeaweedFilerClient, - dir string, ttlSec int32, dirsToProcess *[]string, updateErrors *[]error) error { - - const batchSize = filer.PaginationSize - startFrom := "" - - for { - lastEntryName, entryCount, err := s3a.processTTLBatch(ctx, client, dir, ttlSec, startFrom, batchSize, dirsToProcess, updateErrors) - if err != nil { - return fmt.Errorf("list entries in %s: %w", dir, err) - } - - // If we got fewer entries than batch size, we've reached the end - if entryCount < batchSize { - break - } - startFrom = lastEntryName - } - return nil -} - -// processTTLBatch processes a single batch of entries -func (s3a *S3ApiServer) processTTLBatch(ctx context.Context, client filer_pb.SeaweedFilerClient, - dir string, ttlSec int32, startFrom string, batchSize uint32, - dirsToProcess *[]string, updateErrors *[]error) (lastEntry string, count int, err error) { - - err = filer_pb.SeaweedList(ctx, client, dir, "", func(entry *filer_pb.Entry, isLast bool) error { - lastEntry = entry.Name - count++ - - if entry.IsDirectory { - *dirsToProcess = append(*dirsToProcess, string(util.NewFullPath(dir, entry.Name))) - return nil - } - - // Update entry TTL and S3 expiry flag - if updateErr := s3a.updateEntryTTL(ctx, client, dir, entry, ttlSec); updateErr != nil { - *updateErrors = append(*updateErrors, updateErr) - } - return nil - }, startFrom, false, batchSize) - - return lastEntry, count, err -} - -// updateEntryTTL updates a single entry's TTL and S3 expiry flag -func (s3a *S3ApiServer) updateEntryTTL(ctx context.Context, client filer_pb.SeaweedFilerClient, - dir string, entry *filer_pb.Entry, ttlSec int32) error { - - if entry.Attributes == nil { - entry.Attributes = &filer_pb.FuseAttributes{} - } - if entry.Extended == nil { - entry.Extended = make(map[string][]byte) - } - - // Check if both TTL and S3 expiry flag are already set correctly - flagAlreadySet := string(entry.Extended[s3_constants.SeaweedFSExpiresS3]) == "true" - if entry.Attributes.TtlSec == ttlSec && flagAlreadySet { - return nil // Already up to date - } - - // Set the S3 expiry flag - entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true") - // Update TTL if needed - if entry.Attributes.TtlSec != ttlSec { - entry.Attributes.TtlSec = ttlSec - } - - if err := filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{ - Directory: dir, - Entry: entry, - }); err != nil { - return fmt.Errorf("file %s/%s: %w", dir, entry.Name, err) - } - return nil -} - func (s3a *S3ApiServer) getCollectionName(bucket string) string { if s3a.option.FilerGroup != "" { return fmt.Sprintf("%s_%s", s3a.option.FilerGroup, bucket) diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index 301b385a8..0344664e8 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -14,8 +14,6 @@ import ( "strings" "time" - "github.com/seaweedfs/seaweedfs/weed/util" - "github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil" "github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket" @@ -956,6 +954,24 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr collectionTtls := fc.GetCollectionTtls(collectionName) changed := false + // PUT replaces the entire lifecycle policy, so any day-TTL filer.conf + // entry left over from a previous PUT (rule removed, prefix changed, + // rule disabled, gained a tag/size filter, bucket switched to + // versioned) must be removed first — otherwise a stale entry keeps + // routing new writes to the old collection/replication and the volume + // server keeps expiring objects under the prior TTL. + bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket) + for prefix, ttl := range collectionTtls { + if !strings.HasPrefix(prefix, bucketPrefix) || !strings.HasSuffix(ttl, "d") { + continue + } + fc.DeleteLocationConf(prefix) + changed = true + } + // Re-read after removing so the per-rule dedupe below sees the freshly + // cleared state, not the snapshot from before the loop above. + collectionTtls = fc.GetCollectionTtls(collectionName) + // Check whether the bucket has versioning enabled. Versioned buckets must // NOT use the TTL fast-path because: // 1. TTL volumes expire as a unit, destroying all data — including @@ -1035,13 +1051,10 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } - ttlSec := int32((time.Duration(rule.Expiration.Days) * util.LifeCycleInterval).Seconds()) - glog.V(2).Infof("Start updating TTL for %s", locationPrefix) - if updErr := s3a.updateEntriesTTL(locationPrefix, ttlSec); updErr != nil { - glog.Errorf("PutBucketLifecycleConfigurationHandler update TTL for %s: %s", locationPrefix, updErr) - } else { - glog.V(2).Infof("Finished updating TTL for %s", locationPrefix) - } + // Existing entries are not back-stamped here; the lifecycle worker + // drives expiration off the meta-log and bootstrap walk so a PUT + // stays O(rules) instead of O(objects). New writes inherit TTL from + // the filer.conf entry above. changed = true } @@ -1088,16 +1101,13 @@ func (s3a *S3ApiServer) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *h } collectionTtls := fc.GetCollectionTtls(s3a.getCollectionName(bucket)) changed := false + bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket) for prefix, ttl := range collectionTtls { - bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket) - if strings.HasPrefix(prefix, bucketPrefix) && strings.HasSuffix(ttl, "d") { - pathConf, found := fc.GetLocationConf(prefix) - if found { - pathConf.Ttl = "" - fc.SetLocationConf(pathConf) - } - changed = true + if !strings.HasPrefix(prefix, bucketPrefix) || !strings.HasSuffix(ttl, "d") { + continue } + fc.DeleteLocationConf(prefix) + changed = true } if changed {