Files
seaweedfs/weed/s3api/auth_credentials_subscribe.go
Chris Lu 2458f6c81c feat(s3api): apply lifecycle TTL at write time (#9377)
* feat(s3api): apply lifecycle TTL at write time

The S3 server already has the bucket's lifecycle XML at PUT time (via
the cached BucketConfig), so volume-TTL routing is just a per-write
decision instead of something that needs a separate filer.conf
projection kept in sync via operator commands.

- BucketConfig caches the canonical Rules parsed from the lifecycle
  XML once on load (BucketConfigCache invalidates on Put/Delete
  Lifecycle, so the rules stay current automatically).
- resolveLifecycleTTLForWrite walks the cached rules: longest-prefix
  match, applies tag and size filters against the request, returns
  Days * 86400. Versioned buckets, non-Expiration.Days rules, and
  unevaluable size filters (no Content-Length) yield 0 — the
  lifecycle worker handles those at scan time.
- putToFiler resolves TTL once and passes it through both the
  AssignVolumeRequest (so chunks land on a TTL volume) and the new
  entry's Attributes.TtlSec (so the filer's RocksDB compaction also
  expires the metadata).

Lifecycle XML PUT/DELETE now influences write routing immediately —
no operator command, no filer.conf bookkeeping. The lifecycle worker
remains authoritative for the cases the fast path can't cover (existing
objects via bootstrap, versioned buckets, noncurrent retention,
abort-MPU, tag/size filters that didn't hold at PUT time).

CompleteMultipartUpload and CopyObject still need wiring; left for
follow-ups so this PR stays scoped.

* perf(s3api): pre-filter and sort lifecycle rules for the per-PUT TTL walk

resolveLifecycleTTLForWrite walked every lifecycle rule on every
PutObject, including disabled / non-Expiration.Days rules that could
never fire on the fast path, and computed "longest prefix wins" via a
running max instead of an early exit.

Cache a pre-filtered + pre-sorted slice in BucketConfig:
- buildTTLFastPathRules drops everything except Status=Enabled +
  ExpirationDays>0;
- sorts by descending prefix length (stable, so equal-length rules
  keep their XML order).

The resolver returns on first prefix+filter match. A bucket whose
lifecycle XML has no Expiration.Days rules is now O(1); a typical
bucket with one Expiration.Days rule walks one HasPrefix per PUT.

The cache is built once per bucket-config load. PutBucketLifecycle /
DeleteBucketLifecycle already invalidate the cache, so the fast-path
slice stays current automatically.

* refactor(s3api): LifecycleTTLResolver object + four review fixes

Pulls the per-PUT TTL resolution into a dedicated type so the bucket
config holds one object instead of a slice + magic-walk function:

- LifecycleTTLResolver wraps the pre-filtered, pre-sorted rules.
  nil-safe Resolve so the call site doesn't have to special-case
  buckets with no eligible rules.

Four review findings:

1. (high) drop tag-filtered rules from the fast path. Tags are mutable
   post-PUT via PutObjectTagging but volume TTL is irreversible — an
   object that matched at write time would still expire after the tag
   was removed. Worker re-evaluates current tags at scan time. Fast
   path now keeps only stable predicates: prefix and size.

2. (high) move TTL resolution out of putToFiler. MPU parts, copy-part
   destinations, and other transient writes called putToFiler with
   object="" — bucket-wide rules (empty Prefix) matched and bound a
   TTL clock starting at part-upload time, before
   CompleteMultipartUpload existed. putToFiler now takes an explicit
   ttlSec parameter; only the user-visible PutObject paths
   (PutObjectHandler, postpolicy) feed it from the resolver. MPU and
   copy-part pass 0.

3. (medium) AWS overlapping-rule precedence is "shorter expiration
   wins", not "longest prefix wins". Sort by ExpirationDays ascending
   so the first prefix match is also the shortest applicable rule.

4. (medium) overflow no longer caps at math.MaxInt32 seconds (~68y).
   A longer policy would have expired early. Return 0 instead so the
   worker enforces the actual policy on its own schedule.

Versioning gate moves into the resolver constructor — versioned
buckets get a nil resolver. The five putToFiler callers all updated:
PutObjectHandler + postpolicy resolve via lifecycleTTLForObjectWrite,
suspended/versioned wrappers pass 0 by construction, MPU part and
copy-part SSE pass 0 with a one-line comment about why.

* refactor(s3api): drop unused BucketConfig.LifecycleRules field

The full canonical rule set was set on every bucket-config load but
never read — resolveLifecycleTTLForWrite worked off the resolver's
filtered slice, and the lifecycle worker reads bucket entries straight
off the meta-log instead of this cache. Remove the field and its
s3lifecycle import.

* perf(s3api): pre-compute LifecycleTTLResolver hot-path fields

Resolve was doing per-call work that's actually constant per bucket-
config load: int64 multiplication, max-int32 overflow check, field
indirections through *s3lifecycle.Rule. Move it to the constructor
and pack the rule into a compact ttlRule (prefix + ttlSec int32 +
sizeGT/sizeLT) so the inner loop is HasPrefix → optional size check
→ return.

Drop overflowing rules at construction rather than handling per-
resolve: capping would expire long policies early, and returning 0
in the inner loop would prevent any shorter overlapping rule from
firing. Drop-at-construction composes correctly with the ascending
sort.

Benchmarks (Apple M4):
  NilReceiver           0.99 ns/op   0 B/op
  OneRuleMatching       2.75 ns/op   0 B/op
  FiveRulesNoMatch     13.5  ns/op   0 B/op

* fix(s3api): refresh LifecycleTTL resolver on bucket-config update

storeBucketLifecycleConfiguration writes to Entry.Extended via
updateBucketConfig, which clones the cached BucketConfig and calls
the user fn, then caches the result. The clone inherits the prior
LifecycleTTL pointer and nothing rebuilt it from the new XML, so
add/replace/delete of a lifecycle policy left the wrong resolver in
cache until eviction. Same gap on the meta-log side: peer-driven
updates flowed through updateBucketConfigCacheFromEntry without
re-deriving the resolver.

Centralize the Entry -> derived-field mapping in one helper that
resets every Extended-backed field then repopulates from the entry,
and call it from getBucketConfig (initial load), updateBucketConfig
(after updateEntry succeeds, before caching), and
updateBucketConfigCacheFromEntry (meta-log path). Reset is the
load-bearing part: deleting the lifecycle XML must yield a nil
resolver, since stamping a stale TTL onto subsequent writes is
irreversible.

* fix(s3api): PostPolicy passes object size, not multipart wire size

lifecycleTTLForObjectWrite was reading r.ContentLength, which on the
PostPolicy path is the multipart envelope (form fields + boundaries),
not the uploaded object body. A size-filtered rule would evaluate
against that inflated total and stamp (or skip) a TTL the policy
didn't intend.

Take the object size as an explicit parameter. PutObject still passes
r.ContentLength (correct there); PostPolicy passes the fileSize
already extracted from the form part. Negative size means unknown
and continues to skip any size-filtered rule.

* fix(s3api): treat Object Lock as versioned for lifecycle TTL fast path

Object Lock requires versioning at the API level, but it can be
enabled at create time without S3 ever writing the explicit
Versioning header. The lifecycle resolver construction site only
checked Versioning, so an Object-Lock bucket with no Versioning byte
would still get a fast-path resolver and stamp volume TTL onto
writes — destroying noncurrent versions when the volume expires.

Mirror the OR already used in BucketIsVersioned: ObjectLockConfig
non-nil counts as versioned for resolver construction. Existing
explicit-Versioning paths are unchanged.
2026-05-08 21:35:27 -07:00

248 lines
9.7 KiB
Go

package s3api
import (
"context"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/util"
)
const oidcProvidersDir = filer.IamConfigDirectory + "/oidc-providers"
func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, lastTsNs int64, prefix string, directoriesToWatch []string) {
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
// For rename/move operations, NewParentPath contains the destination directory.
// We process both source and destination dirs so moves out of watched
// directories (e.g., IAM config dirs) are not missed.
dir := resp.Directory
if message.NewParentPath != "" {
dir = message.NewParentPath
}
// Handle all metadata changes (create, update, delete, rename)
// These handlers check for nil entries internally
_ = s3a.onBucketMetadataChange(dir, message.OldEntry, message.NewEntry)
_ = s3a.onIamConfigChange(dir, message.OldEntry, message.NewEntry)
_ = s3a.onOIDCProviderChange(dir, message.OldEntry, message.NewEntry)
_ = s3a.onCircuitBreakerConfigChange(dir, message.OldEntry, message.NewEntry)
// For moves across directories, replay a delete event for the source directory
if message.NewParentPath != "" && resp.Directory != message.NewParentPath {
_ = s3a.onBucketMetadataChange(resp.Directory, message.OldEntry, nil)
_ = s3a.onIamConfigChange(resp.Directory, message.OldEntry, nil)
_ = s3a.onOIDCProviderChange(resp.Directory, message.OldEntry, nil)
_ = s3a.onCircuitBreakerConfigChange(resp.Directory, message.OldEntry, nil)
}
// For same-directory renames, replay a delete event for the old name
// so handlers can clean up stale state (e.g., old bucket names)
if message.OldEntry != nil && message.NewEntry != nil &&
(message.NewParentPath == "" || message.NewParentPath == resp.Directory) &&
message.OldEntry.Name != message.NewEntry.Name {
_ = s3a.onBucketMetadataChange(dir, message.OldEntry, nil)
_ = s3a.onCircuitBreakerConfigChange(dir, message.OldEntry, nil)
}
return nil
}
metadataFollowOption := &pb.MetadataFollowOption{
ClientName: clientName,
ClientId: s3a.randomClientId,
ClientEpoch: 1,
SelfSignature: 0,
PathPrefix: prefix,
AdditionalPathPrefixes: nil,
DirectoriesToWatch: directoriesToWatch,
StartTsNs: lastTsNs,
StopTsNs: 0,
EventErrorType: pb.FatalOnError,
}
util.RetryUntil("followIamChanges", func() error {
metadataFollowOption.ClientEpoch++
return pb.WithFilerClientFollowMetadata(s3a, metadataFollowOption, processEventFn)
}, func(err error) bool {
glog.V(1).Infof("iam follow metadata changes: %v", err)
return true
})
}
// onIamConfigChange handles IAM config file changes (create, update, delete)
func (s3a *S3ApiServer) onIamConfigChange(dir string, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error {
if s3a.iam != nil && s3a.iam.IsStaticConfig() {
glog.V(1).Infof("Skipping IAM config update for static configuration")
return nil
}
if s3a.iam == nil {
return nil
}
reloadIamConfig := func(reason string) error {
glog.V(1).Infof("IAM change detected in %s, reloading configuration", reason)
if err := s3a.iam.LoadS3ApiConfigurationFromCredentialManager(); err != nil {
glog.Errorf("failed to reload IAM configuration after change in %s: %v", reason, err)
return err
}
return nil
}
// 1. Handle traditional single identity.json file
if dir == filer.IamConfigDirectory {
// Handle create/update/delete events on legacy identity.json.
// During migration this file is renamed, which emits a delete event.
// Always reload from the credential manager so we keep the migrated identities.
if (oldEntry != nil && oldEntry.Name == filer.IamIdentityFile) ||
(newEntry != nil && newEntry.Name == filer.IamIdentityFile) {
if err := reloadIamConfig(dir + "/" + filer.IamIdentityFile); err != nil {
return err
}
}
return nil
}
// 2. Handle multiple-file identities and policies
// Watch /etc/iam/{identities,policies,service_accounts}
isIdentityDir := dir == filer.IamConfigDirectory+"/identities" || strings.HasPrefix(dir, filer.IamConfigDirectory+"/identities/")
isPolicyDir := dir == filer.IamConfigDirectory+"/policies" || strings.HasPrefix(dir, filer.IamConfigDirectory+"/policies/")
isServiceAccountDir := dir == filer.IamConfigDirectory+"/service_accounts" || strings.HasPrefix(dir, filer.IamConfigDirectory+"/service_accounts/")
isGroupDir := dir == filer.IamConfigDirectory+"/groups" || strings.HasPrefix(dir, filer.IamConfigDirectory+"/groups/")
if isIdentityDir || isPolicyDir || isServiceAccountDir || isGroupDir {
// For multiple-file mode, any change in these directories should trigger a full reload
// from the credential manager (which handles the details of loading from multiple files).
if err := reloadIamConfig(dir); err != nil {
return err
}
}
return nil
}
// onOIDCProviderChange refreshes the IAM-managed OIDC provider runtime view
// whenever the persisted store under /etc/iam/oidc-providers changes — both
// for mutations originated on this S3 server (the local IAM API also calls
// RefreshOIDCProvidersFromStore inline, but the subscribe path costs nothing
// extra) and for mutations originated on peer S3 servers, which this is the
// only mechanism to learn about. A single refresh covers create, update,
// delete, and rename because the store is small and a full reload is the
// safest way to reach a consistent view.
func (s3a *S3ApiServer) onOIDCProviderChange(dir string, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error {
if dir != oidcProvidersDir && !strings.HasPrefix(dir, oidcProvidersDir+"/") {
return nil
}
if s3a.iam == nil || s3a.iam.iamIntegration == nil {
return nil
}
s3iam, ok := s3a.iam.iamIntegration.(*S3IAMIntegration)
if !ok || s3iam.iamManager == nil {
return nil
}
if err := s3iam.iamManager.RefreshOIDCProvidersFromStore(context.Background()); err != nil {
glog.Warningf("OIDC provider refresh after %s change failed: %v", dir, err)
return err
}
glog.V(2).Infof("Refreshed IAM-managed OIDC providers after %s change", dir)
return nil
}
// onCircuitBreakerConfigChange handles circuit breaker config file changes (create, update, delete)
func (s3a *S3ApiServer) onCircuitBreakerConfigChange(dir string, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error {
if dir != s3_constants.CircuitBreakerConfigDir {
return nil
}
// Handle deletion: reset to empty config
if newEntry == nil && oldEntry != nil && oldEntry.Name == s3_constants.CircuitBreakerConfigFile {
glog.V(1).Infof("Circuit breaker config file deleted, resetting to defaults")
if err := s3a.cb.LoadS3ApiConfigurationFromBytes([]byte{}); err != nil {
glog.Warningf("failed to reset circuit breaker config on deletion: %v", err)
return err
}
return nil
}
// Handle create/update
if newEntry != nil && newEntry.Name == s3_constants.CircuitBreakerConfigFile {
if err := s3a.cb.LoadS3ApiConfigurationFromBytes(newEntry.Content); err != nil {
return err
}
glog.V(1).Infof("updated %s/%s", dir, newEntry.Name)
}
return nil
}
// reload bucket metadata
func (s3a *S3ApiServer) onBucketMetadataChange(dir string, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error {
if dir == s3a.option.BucketsPath {
if newEntry != nil {
// Update bucket registry (existing functionality)
s3a.bucketRegistry.LoadBucketMetadata(newEntry)
glog.V(1).Infof("updated bucketMetadata %s/%s", dir, newEntry.Name)
// Update bucket configuration cache with new entry
s3a.updateBucketConfigCacheFromEntry(newEntry)
} else if oldEntry != nil {
// Remove from bucket registry (existing functionality)
s3a.bucketRegistry.RemoveBucketMetadata(oldEntry)
glog.V(1).Infof("remove bucketMetadata %s/%s", dir, oldEntry.Name)
// Remove from bucket configuration cache
s3a.invalidateBucketConfigCache(oldEntry.Name)
}
}
return nil
}
// updateBucketConfigCacheFromEntry updates the bucket config cache when a bucket entry changes
func (s3a *S3ApiServer) updateBucketConfigCacheFromEntry(entry *filer_pb.Entry) {
if s3a.bucketConfigCache == nil {
return
}
bucket := entry.Name
glog.V(3).Infof("updateBucketConfigCacheFromEntry: called for bucket %s, ExtObjectLockEnabledKey=%s",
bucket, string(entry.Extended[s3_constants.ExtObjectLockEnabledKey]))
// Create new bucket config from the entry. populateBucketConfigDerivedFields
// is the single source of truth for mapping Entry.Extended → cached
// fields (incl. LifecycleTTL), so a meta-log Put/DeleteBucketLifecycle
// here can't leave a stale resolver in cache.
config := &BucketConfig{
Name: bucket,
Entry: entry,
}
s3a.populateBucketConfigDerivedFields(config)
// Update timestamp
config.LastModified = time.Now()
// Update cache
glog.V(3).Infof("updateBucketConfigCacheFromEntry: updating cache for bucket %s, ObjectLockConfig=%+v", bucket, config.ObjectLockConfig)
s3a.bucketConfigCache.Set(bucket, config)
// Remove from negative cache since bucket now exists
// This is important for buckets created via weed shell or other external means
s3a.bucketConfigCache.RemoveNegativeCache(bucket)
}
// invalidateBucketConfigCache removes a bucket from the configuration cache
func (s3a *S3ApiServer) invalidateBucketConfigCache(bucket string) {
if s3a.bucketConfigCache == nil {
return
}
s3a.bucketConfigCache.Remove(bucket)
s3a.bucketConfigCache.RemoveNegativeCache(bucket) // Also remove from negative cache
glog.V(2).Infof("invalidateBucketConfigCache: removed bucket %s from cache", bucket)
}