diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go index 03259aca4..6f23ccdc6 100644 --- a/weed/s3api/auth_credentials_subscribe.go +++ b/weed/s3api/auth_credentials_subscribe.go @@ -214,55 +214,15 @@ func (s3a *S3ApiServer) updateBucketConfigCacheFromEntry(entry *filer_pb.Entry) glog.V(3).Infof("updateBucketConfigCacheFromEntry: called for bucket %s, ExtObjectLockEnabledKey=%s", bucket, string(entry.Extended[s3_constants.ExtObjectLockEnabledKey])) - // Create new bucket config from the entry + // Create new bucket config from the entry. populateBucketConfigDerivedFields + // is the single source of truth for mapping Entry.Extended → cached + // fields (incl. LifecycleTTL), so a meta-log Put/DeleteBucketLifecycle + // here can't leave a stale resolver in cache. config := &BucketConfig{ - Name: bucket, - Entry: entry, - IsPublicRead: false, // Explicitly default to false for private buckets - } - - // Extract configuration from extended attributes - if entry.Extended != nil { - if versioning, exists := entry.Extended[s3_constants.ExtVersioningKey]; exists { - config.Versioning = string(versioning) - } - if ownership, exists := entry.Extended[s3_constants.ExtOwnershipKey]; exists { - config.Ownership = string(ownership) - } - if acl, exists := entry.Extended[s3_constants.ExtAmzAclKey]; exists { - config.ACL = acl - // Parse ACL and cache public-read status - config.IsPublicRead = parseAndCachePublicReadStatus(acl) - } else { - // No ACL means private bucket - config.IsPublicRead = false - } - if owner, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists { - config.Owner = string(owner) - } - // Parse Object Lock configuration if present - if objectLockConfig, found := LoadObjectLockConfigurationFromExtended(entry); found { - config.ObjectLockConfig = objectLockConfig - glog.V(2).Infof("updateBucketConfigCacheFromEntry: cached Object Lock configuration for bucket %s: %+v", bucket, objectLockConfig) - } else { - glog.V(3).Infof("updateBucketConfigCacheFromEntry: no Object Lock configuration found for bucket %s", bucket) - } - - // Load bucket policy if present (for performance optimization) - config.BucketPolicy = loadBucketPolicyFromExtended(entry, bucket) - } - - // Sync bucket policy to the policy engine for evaluation - s3a.syncBucketPolicyToEngine(bucket, config.BucketPolicy) - - // Parse CORS configuration directly from the subscription entry's Content field. - // This avoids a separate RPC call that could return stale data when racing with - // concurrent metadata updates (e.g., PutBucketCors clearing the cache while this - // handler is still processing an older event). - config.CORS = parseCORSFromEntryContent(entry.Content) - if config.CORS != nil { - glog.V(2).Infof("updateBucketConfigCacheFromEntry: parsed CORS config for bucket %s from entry content", bucket) + Name: bucket, + Entry: entry, } + s3a.populateBucketConfigDerivedFields(config) // Update timestamp config.LastModified = time.Now() diff --git a/weed/s3api/s3api_bucket_config.go b/weed/s3api/s3api_bucket_config.go index c9c108df5..dfdb7baec 100644 --- a/weed/s3api/s3api_bucket_config.go +++ b/weed/s3api/s3api_bucket_config.go @@ -18,6 +18,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/cors" + "github.com/seaweedfs/seaweedfs/weed/s3api/lifecycle_xml" "github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" @@ -34,7 +35,15 @@ type BucketConfig struct { CORS *cors.CORSConfiguration ObjectLockConfig *ObjectLockConfiguration // Cached parsed Object Lock configuration BucketPolicy *policy_engine.PolicyDocument // Cached bucket policy for performance - KMSKeyCache *BucketKMSCache // Per-bucket KMS key cache for SSE-KMS operations + // LifecycleTTL answers "what volume TTL should this PutObject get?" + // using only fast-path-safe predicates (prefix + size; tags excluded + // because they're mutable post-PUT). nil = no TTL applies (no + // lifecycle config, versioned bucket, or only ineligible rules). + // The full canonical rule set lives inside the resolver; the + // lifecycle worker reads bucket entries directly off the meta-log + // rather than this cache. + LifecycleTTL *LifecycleTTLResolver + KMSKeyCache *BucketKMSCache // Per-bucket KMS key cache for SSE-KMS operations LastModified time.Time Entry *filer_pb.Entry } @@ -371,11 +380,43 @@ func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.Err Entry: entry, IsPublicRead: false, // Explicitly default to false for private buckets } + s3a.populateBucketConfigDerivedFields(config) + + // Cache the result + s3a.bucketConfigCache.Set(bucket, config) + + return config, s3err.ErrNone +} + +// populateBucketConfigDerivedFields fills every field on BucketConfig that is +// derived from Entry.Extended / Entry.Content (versioning flag, ACL, owner, +// object lock, bucket policy, CORS, lifecycle TTL resolver). It is the +// single source of truth for that mapping; callers that take a fresh +// BucketConfig (getBucketConfig, updateBucketConfig after the user's update +// fn runs, the meta-log subscription cache refresher) all funnel through +// here so a missed field can't silently keep stale data — e.g. a stale +// LifecycleTTL after a Put/DeleteBucketLifecycle would keep stamping the +// old policy's irreversible volume TTL onto new writes. +func (s3a *S3ApiServer) populateBucketConfigDerivedFields(config *BucketConfig) { + // Reset every derived field so stale values from a previous Entry + // don't survive a clear (e.g. DELETE policy → BucketPolicy=nil). + config.Versioning = "" + config.Ownership = "" + config.ACL = nil + config.Owner = "" + config.IsPublicRead = false + config.ObjectLockConfig = nil + config.BucketPolicy = nil + config.LifecycleTTL = nil + config.CORS = nil + + entry := config.Entry + if entry == nil { + return + } + bucket := config.Name - // Extract configuration from extended attributes if entry.Extended != nil { - glog.V(3).Infof("getBucketConfig: checking extended attributes for bucket %s, ExtObjectLockEnabledKey value=%s", - bucket, string(entry.Extended[s3_constants.ExtObjectLockEnabledKey])) if versioning, exists := entry.Extended[s3_constants.ExtVersioningKey]; exists { config.Versioning = string(versioning) } @@ -384,38 +425,43 @@ func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.Err } if acl, exists := entry.Extended[s3_constants.ExtAmzAclKey]; exists { config.ACL = acl - // Parse ACL once and cache public-read status + // Parse ACL once and cache public-read status. config.IsPublicRead = parseAndCachePublicReadStatus(acl) - } else { - // No ACL means private bucket - config.IsPublicRead = false } if owner, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists { config.Owner = string(owner) } - // Parse Object Lock configuration if present if objectLockConfig, found := LoadObjectLockConfigurationFromExtended(entry); found { config.ObjectLockConfig = objectLockConfig - glog.V(3).Infof("getBucketConfig: loaded Object Lock config from extended attributes for bucket %s: %+v", bucket, objectLockConfig) - } else { - glog.V(3).Infof("getBucketConfig: no Object Lock config found in extended attributes for bucket %s", bucket) } - - // Load bucket policy if present (for performance optimization) config.BucketPolicy = loadBucketPolicyFromExtended(entry, bucket) + + // Pre-parse lifecycle XML so the per-write TTL resolver doesn't + // pay parsing cost on every PutObject. nil on parse error so + // the PUT path falls through to "no TTL" rather than rejecting + // writes. + if xmlBytes, ok := entry.Extended[bucketLifecycleConfigurationXMLKey]; ok && len(xmlBytes) > 0 { + if rules, err := lifecycle_xml.ParseCanonical(xmlBytes); err == nil { + // Object Lock requires versioning, so an ObjectLockConfig + // implies the bucket is versioned even when the explicit + // Versioning header was never written. BucketIsVersioned + // in this file uses the same OR — keep them aligned. + versioned := config.Versioning == s3_constants.VersioningEnabled || + config.Versioning == s3_constants.VersioningSuspended || + config.ObjectLockConfig != nil + config.LifecycleTTL = NewLifecycleTTLResolver(rules, versioned) + } else { + glog.V(1).Infof("populateBucketConfigDerivedFields: bucket %s lifecycle xml parse: %v", bucket, err) + } + } } - // Sync bucket policy to the policy engine for evaluation + // Sync bucket policy to the policy engine for evaluation. s3a.syncBucketPolicyToEngine(bucket, config.BucketPolicy) // Parse CORS configuration directly from the entry's Content field. // This avoids a redundant RPC call since we already have the entry. config.CORS = parseCORSFromEntryContent(entry.Content) - - // Cache the result - s3a.bucketConfigCache.Set(bucket, config) - - return config, s3err.ErrNone } // updateBucketConfig updates bucket configuration and invalidates cache @@ -487,7 +533,12 @@ func (s3a *S3ApiServer) updateBucketConfig(bucket string, updateFn func(*BucketC } glog.V(3).Infof("updateBucketConfig: saved entry to filer for bucket %s", bucket) - // Update cache + // Update cache. Re-derive every Extended-backed field from the + // just-saved Entry — the user's update fn may have flipped, added, + // or cleared bytes (e.g. PutBucketLifecycle / DeleteBucketLifecycle + // rewrites the lifecycle XML key) and the resolver / parsed configs + // must follow. + s3a.populateBucketConfigDerivedFields(nextConfig) s3a.bucketConfigCache.Set(bucket, nextConfig) return s3err.ErrNone diff --git a/weed/s3api/s3api_object_handlers_copy_part_sse.go b/weed/s3api/s3api_object_handlers_copy_part_sse.go index 47f496c2f..36519446e 100644 --- a/weed/s3api/s3api_object_handlers_copy_part_sse.go +++ b/weed/s3api/s3api_object_handlers_copy_part_sse.go @@ -395,7 +395,9 @@ func (s3a *S3ApiServer) copyObjectPartViaReencryption( // on the UploadPartCopy response. Without this, clients have no way to // see that the destination was encrypted. filePath := s3a.genPartUploadPath(dstBucket, uploadID, partID) - tag, code, putSSE := s3a.putToFiler(cloned, filePath, srcReader, dstBucket, "", partID, nil) + // Copy-part is an MPU part write under .uploads//; lifecycle + // TTL only applies to the eventual completed object. Pass 0. + tag, code, putSSE := s3a.putToFiler(cloned, filePath, srcReader, dstBucket, "", partID, 0, nil) if code != s3err.ErrNone { return "", SSEResponseMetadata{}, code } diff --git a/weed/s3api/s3api_object_handlers_multipart.go b/weed/s3api/s3api_object_handlers_multipart.go index ddd0daa09..cb53d51a3 100644 --- a/weed/s3api/s3api_object_handlers_multipart.go +++ b/weed/s3api/s3api_object_handlers_multipart.go @@ -452,7 +452,12 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ glog.V(2).Infof("PutObjectPart: bucket=%s, object=%s, uploadId=%s, partNumber=%d, size=%d", bucket, object, uploadID, partID, r.ContentLength) - etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, dataReader, bucket, "", partID, nil) + // MPU parts must NOT inherit the bucket's lifecycle Expiration.Days + // volume TTL: the rule targets the user-visible object, not the + // transient .uploads// path, and a part write would otherwise + // start the TTL clock before CompleteMultipartUpload ever assembled + // the object. + etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, dataReader, bucket, "", partID, 0, nil) if errCode != s3err.ErrNone { glog.Errorf("PutObjectPart: putToFiler failed with error code %v for bucket=%s, object=%s, partNumber=%d", errCode, bucket, object, partID) diff --git a/weed/s3api/s3api_object_handlers_postpolicy.go b/weed/s3api/s3api_object_handlers_postpolicy.go index c82cdfabd..94a461b6b 100644 --- a/weed/s3api/s3api_object_handlers_postpolicy.go +++ b/weed/s3api/s3api_object_handlers_postpolicy.go @@ -131,7 +131,11 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R // Forward validated POST form fields to the underlying PUT as headers. applyPostPolicyFormHeaders(r, formValues) - etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, fileBody, bucket, object, 1, nil) + // Use fileSize, not r.ContentLength: the multipart body wrapping form + // fields and boundaries inflates ContentLength relative to the + // object body, which would mis-evaluate any size-filtered rule. + ttlSec := s3a.lifecycleTTLForObjectWrite(bucket, object, fileSize) + etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, fileBody, bucket, object, 1, ttlSec, nil) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 381f6b4bb..aaf418a24 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -296,7 +296,8 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) dataReader = mimeDetect(r, dataReader) } - etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, dataReader, bucket, object, 1, nil) + ttlSec := s3a.lifecycleTTLForObjectWrite(bucket, object, r.ContentLength) + etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, dataReader, bucket, object, 1, ttlSec, nil) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) @@ -351,7 +352,14 @@ func (s3a *S3ApiServer) withObjectWriteLock(bucket, object string, preconditionF return fn() } -func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader io.Reader, bucket string, object string, partNumber int, afterCreate func(entry *filer_pb.Entry) s3err.ErrorCode) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) { +// putToFiler writes one chunk of object bytes (a full PutObject body, a +// single MPU part, a copy-part destination). lifecycleTTLSec is non-zero +// only for top-level PutObject paths where the lifecycle XML's +// Expiration.Days fast path applies — MPU parts and copy-parts always +// pass 0 because their own keys aren't the user-visible object the rule +// targets and a part write would otherwise bind a TTL clock starting +// before CompleteMultipartUpload. +func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader io.Reader, bucket string, object string, partNumber int, lifecycleTTLSec int32, afterCreate func(entry *filer_pb.Entry) s3err.ErrorCode) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) { // NEW OPTIMIZATION: Write directly to volume servers, bypassing filer proxy // This eliminates the filer proxy overhead for PUT operations // Note: filePath is now passed directly instead of URL (no parsing needed) @@ -465,6 +473,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader DataCenter: s3a.option.DataCenter, Path: filePath, ExpectedDataSize: expectedDataSize, + TtlSec: lifecycleTTLSec, }) if err != nil { return fmt.Errorf("assign volume: %w", err) @@ -639,6 +648,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader Gid: 0, Mime: mimeType, FileSize: uint64(chunkResult.TotalSize), + TtlSec: lifecycleTTLSec, }, Chunks: chunkResult.FileChunks, // All chunks from auto-chunking Extended: make(map[string][]byte), @@ -1261,8 +1271,10 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob } } - // Upload the file using putToFiler - this will create the file with version metadata - etag, errCode, sseMetadata = s3a.putToFiler(r, filePath, body, bucket, normalizedObject, 1, nil) + // Upload the file using putToFiler - this will create the file with version metadata. + // Versioned/suspended bucket → resolver returns 0 by construction; + // pass 0 directly so the path is explicit at the call site. + etag, errCode, sseMetadata = s3a.putToFiler(r, filePath, body, bucket, normalizedObject, 1, 0, nil) if errCode != s3err.ErrNone { glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode) return "", errCode, SSEResponseMetadata{} @@ -1412,7 +1424,10 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin } } - etag, errCode, sseMetadata = s3a.putToFiler(r, versionFilePath, body, bucket, normalizedObject, 1, func(versionEntry *filer_pb.Entry) s3err.ErrorCode { + // Versioned bucket: resolver returns 0 by construction. Pass 0 + // directly — versioned objects sit on regular volumes and the + // lifecycle worker handles their expiration. + etag, errCode, sseMetadata = s3a.putToFiler(r, versionFilePath, body, bucket, normalizedObject, 1, 0, func(versionEntry *filer_pb.Entry) s3err.ErrorCode { if err := s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName, versionEntry); err != nil { glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err) return s3err.ErrInternalError diff --git a/weed/s3api/s3api_object_lifecycle_ttl.go b/weed/s3api/s3api_object_lifecycle_ttl.go new file mode 100644 index 000000000..2e11b7723 --- /dev/null +++ b/weed/s3api/s3api_object_lifecycle_ttl.go @@ -0,0 +1,152 @@ +package s3api + +import ( + "math" + "sort" + "strings" + + "github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle" +) + +// secondsPerDay is the conversion factor between Lifecycle Expiration.Days +// (a calendar-day count) and the volume server's TTL field (seconds since +// creation). This intentionally does NOT use AWS's "next 00:00 UTC" rounding; +// that's an expiration-firing nuance the lifecycle worker enforces, not +// something the per-write fast path can model without reading the clock at +// each write. +const secondsPerDay = int64(86400) + +// LifecycleTTLResolver answers "what volume TTL should this write get?" for +// the PutObject path. Constructed once per bucket-config load with a +// pre-filtered, pre-sorted slice of compact rules so per-write cost is one +// HasPrefix per rule walked, exiting on first match. +// +// Stable predicates only: prefix and size. Tag-filtered rules are NOT in +// the fast path because tags can be replaced post-PUT via PutObjectTagging +// while volume TTL is irreversible — an object that matched at write time +// would still expire after the tag was removed. The lifecycle worker +// re-evaluates current tags at scan time. +// +// nil receiver means "no TTL applies" (no eligible rules, bucket +// versioned, or every rule overflows int32 seconds); callers can use a +// nil resolver freely. +type LifecycleTTLResolver struct { + rules []ttlRule +} + +// ttlRule is the compact, hot-path projection of an Expiration.Days rule: +// just the four fields Resolve reads, with ExpirationDays already converted +// to int32 seconds so the inner loop has no arithmetic and no overflow +// branch. +type ttlRule struct { + prefix string + ttlSec int32 + sizeGT int64 + sizeLT int64 +} + +// NewLifecycleTTLResolver pre-filters and pre-sorts rules. Returns nil +// when nothing on the fast path can apply — callers don't need to special- +// case the empty-bucket / versioned-bucket / tag-only-rules cases. +// +// Sort is ascending by ttlSec so first prefix match is also the shortest +// matching expiration — AWS's overlapping-rule precedence (see +// https://docs.aws.amazon.com/AmazonS3/latest/userguide/lifecycle-conflicts.html). +// Stable so equal-Days rules keep their XML order. +// +// Rules that overflow int32 seconds (~68 years) are dropped at construction +// rather than handled per-resolve: capping would expire long policies +// early, and returning 0 from Resolve in the inner loop would prevent any +// shorter overlapping rule from being considered. Drop-at-construction +// composes correctly with the ascending sort. +func NewLifecycleTTLResolver(rules []*s3lifecycle.Rule, versioned bool) *LifecycleTTLResolver { + if versioned || len(rules) == 0 { + // Versioned buckets: TTL volumes expire as a unit, which would + // destroy noncurrent versions. Worker drives expiration there. + return nil + } + out := make([]ttlRule, 0, len(rules)) + for _, r := range rules { + if r == nil || r.Status != s3lifecycle.StatusEnabled { + continue + } + if r.ExpirationDays <= 0 { + continue // NoncurrentVersionExpiration / AbortMPU / etc. + } + if len(r.FilterTags) > 0 { + // Tag-mutable; defer to the worker so a tag flip can't leave + // us with a volume-TTL stamp the policy no longer dictates. + continue + } + secs := int64(r.ExpirationDays) * secondsPerDay + if secs > math.MaxInt32 { + // Volume TTL is int32 seconds. A rule that doesn't fit + // can't be represented without expiring early; the + // lifecycle worker enforces it on its own schedule. + continue + } + out = append(out, ttlRule{ + prefix: r.Prefix, + ttlSec: int32(secs), + sizeGT: r.FilterSizeGreaterThan, + sizeLT: r.FilterSizeLessThan, + }) + } + if len(out) == 0 { + return nil + } + sort.SliceStable(out, func(i, j int) bool { + return out[i].ttlSec < out[j].ttlSec + }) + return &LifecycleTTLResolver{rules: out} +} + +// Resolve returns the volume TTL (in seconds) for a write of the given +// object key and size, or 0 when no fast-path rule applies. +// +// The receiver may be nil — that's the common "no rules" case and it +// returns 0 without allocating. +func (r *LifecycleTTLResolver) Resolve(objectKey string, size int64) int32 { + if r == nil { + return 0 + } + for i := range r.rules { + rule := &r.rules[i] + if !strings.HasPrefix(objectKey, rule.prefix) { + continue + } + // Size filter: unevaluable when Content-Length is unknown + // (size<0) and the rule has any size predicate; otherwise + // either bound short-circuits. + if rule.sizeGT > 0 || rule.sizeLT > 0 { + if size < 0 { + continue + } + if rule.sizeGT > 0 && size <= rule.sizeGT { + continue + } + if rule.sizeLT > 0 && size >= rule.sizeLT { + continue + } + } + return rule.ttlSec + } + return 0 +} + +// lifecycleTTLForObjectWrite is the PutObject call-site wrapper. Returns 0 +// for any caller (MPU part, copy-part) that shouldn't bind a TTL clock — +// see putToFiler's signature comment for which paths pass 0 directly. +// +// Callers MUST pass the actual object size, not r.ContentLength when those +// differ. r.ContentLength is the wire size: for a multipart PostPolicy +// upload it includes form fields and boundaries, so a size-filtered rule +// would mis-evaluate against the form total instead of the file body. +// objectSize<0 is "unknown" — the resolver skips any size-filtered rule. +func (s3a *S3ApiServer) lifecycleTTLForObjectWrite(bucket, objectKey string, objectSize int64) int32 { + cfg, _ := s3a.getBucketConfig(bucket) + if cfg == nil || cfg.LifecycleTTL == nil { + return 0 + } + return cfg.LifecycleTTL.Resolve(objectKey, objectSize) +} diff --git a/weed/s3api/s3api_object_lifecycle_ttl_test.go b/weed/s3api/s3api_object_lifecycle_ttl_test.go new file mode 100644 index 000000000..302c62aff --- /dev/null +++ b/weed/s3api/s3api_object_lifecycle_ttl_test.go @@ -0,0 +1,262 @@ +package s3api + +import ( + "math" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle" +) + +func enabledRule(prefix string, days int) *s3lifecycle.Rule { + return &s3lifecycle.Rule{ + ID: "r-" + prefix, + Status: s3lifecycle.StatusEnabled, + Prefix: prefix, + ExpirationDays: days, + } +} + +func mustResolver(t *testing.T, rules ...*s3lifecycle.Rule) *LifecycleTTLResolver { + t.Helper() + return NewLifecycleTTLResolver(rules, false) +} + +func TestNewLifecycleTTLResolver_NilOnEmpty(t *testing.T) { + if got := NewLifecycleTTLResolver(nil, false); got != nil { + t.Fatalf("nil rules → resolver=%v, want nil", got) + } + if got := NewLifecycleTTLResolver([]*s3lifecycle.Rule{}, false); got != nil { + t.Fatalf("empty rules → resolver=%v, want nil", got) + } +} + +func TestNewLifecycleTTLResolver_NilOnVersionedBucket(t *testing.T) { + // Versioned buckets cannot use volume TTL — TTL volumes destroy + // noncurrent versions as a unit. Resolver collapses to nil so the + // PUT path's nil-receiver Resolve returns 0 without checking flags. + rules := []*s3lifecycle.Rule{enabledRule("logs/", 7)} + if got := NewLifecycleTTLResolver(rules, true); got != nil { + t.Fatalf("versioned bucket → resolver=%v, want nil", got) + } +} + +func TestNewLifecycleTTLResolver_DropsTagFilteredRules(t *testing.T) { + // HIGH-priority finding: tag-filtered rules are unsafe on the fast + // path. Tags can be replaced via PutObjectTagging after the write, + // but the volume TTL is irreversible. Worker handles tag-filtered + // rules at scan time; the fast path drops them entirely. + tagRule := enabledRule("logs/", 7) + tagRule.FilterTags = map[string]string{"k": "v"} + plainRule := enabledRule("data/", 30) + + r := mustResolver(t, tagRule, plainRule) + if r == nil { + t.Fatalf("plain rule should still produce a resolver") + } + // Tag-filtered key would have matched but is now invisible. + if got := r.Resolve("logs/foo", 1); got != 0 { + t.Fatalf("tag-filtered rule must not appear on fast path, got %d", got) + } + if got := r.Resolve("data/foo", 1); got != 30*86400 { + t.Fatalf("plain rule still applies, got %d", got) + } +} + +func TestNewLifecycleTTLResolver_DropsDisabledAndNonExpirationDays(t *testing.T) { + disabled := enabledRule("logs/", 7) + disabled.Status = s3lifecycle.StatusDisabled + noExp := &s3lifecycle.Rule{ + ID: "r", Status: s3lifecycle.StatusEnabled, Prefix: "logs/", + NoncurrentVersionExpirationDays: 7, + } + if got := NewLifecycleTTLResolver([]*s3lifecycle.Rule{disabled, noExp}, false); got != nil { + t.Fatalf("only-ineligible rules → resolver=%v, want nil", got) + } +} + +func TestResolve_PrefixMatch(t *testing.T) { + r := mustResolver(t, enabledRule("logs/", 7)) + if got := r.Resolve("logs/foo.txt", 1); got != 7*86400 { + t.Fatalf("want 7d in seconds, got %d", got) + } + if got := r.Resolve("data/foo.txt", 1); got != 0 { + t.Fatalf("non-matching prefix should yield 0, got %d", got) + } +} + +func TestResolve_OverlappingRulesShorterExpirationWins(t *testing.T) { + // MEDIUM-priority finding: AWS overlapping-rule precedence is + // "shorter expiration wins". Sort ascending by ExpirationDays so + // the first prefix match is also the shortest applicable rule. + r := mustResolver(t, + enabledRule("logs/", 30), // broad, long + enabledRule("logs/critical/", 90), // specific, longer + enabledRule("logs/", 7), // broad, short + ) + // "logs/foo" matches both broad rules; the shorter (7d) wins. + if got := r.Resolve("logs/foo", 1); got != 7*86400 { + t.Fatalf("shorter expiration must win, got %d (want 7d)", got) + } + // "logs/critical/x" matches all three; 7d still wins (shorter than + // the more specific 90d). Longest-prefix is NOT the AWS rule. + if got := r.Resolve("logs/critical/x", 1); got != 7*86400 { + t.Fatalf("shorter expiration must win across overlaps, got %d (want 7d)", got) + } +} + +func TestResolve_OverflowDefersToWorker(t *testing.T) { + // MEDIUM-priority finding: capping at math.MaxInt32 seconds (~68y) + // would expire LONGER policies early. Return 0 instead so the + // worker enforces the actual policy on its own schedule. + bigDays := int(math.MaxInt32/secondsPerDay) + 1 + r := mustResolver(t, enabledRule("anything", bigDays)) + if got := r.Resolve("anything-x", 1); got != 0 { + t.Fatalf("overflow must yield 0 (worker handles), got %d", got) + } +} + +func TestResolve_OverflowSkipsButShorterStillFires(t *testing.T) { + // Pathological case: short and overflowing rule on overlapping + // prefix. Ascending sort puts the short one first; the overflow + // rule never gets a chance to mis-cap. + bigDays := int(math.MaxInt32/secondsPerDay) + 1 + r := mustResolver(t, + enabledRule("anything", bigDays), + enabledRule("anything", 7), + ) + if got := r.Resolve("anything-x", 1); got != 7*86400 { + t.Fatalf("shorter rule must still fire on overlap, got %d", got) + } +} + +func TestResolve_SizeFilter(t *testing.T) { + rule := enabledRule("logs/", 7) + rule.FilterSizeGreaterThan = 1024 + r := mustResolver(t, rule) + + // Below threshold → skip. + if got := r.Resolve("logs/foo", 100); got != 0 { + t.Fatalf("size <= threshold must skip, got %d", got) + } + // Above threshold → apply. + if got := r.Resolve("logs/foo", 2048); got != 7*86400 { + t.Fatalf("size > threshold must apply, got %d", got) + } + // Unknown size + size filter → skip (conservative). + if got := r.Resolve("logs/foo", -1); got != 0 { + t.Fatalf("unknown size with filter must skip, got %d", got) + } +} + +func TestResolve_NilReceiverReturnsZero(t *testing.T) { + // nil-receiver-safe Resolve avoids the call site needing to check + // whether the bucket has a resolver at all. + var r *LifecycleTTLResolver + if got := r.Resolve("logs/foo", 1); got != 0 { + t.Fatalf("nil resolver must return 0, got %d", got) + } +} + +func BenchmarkLifecycleTTLResolver_Resolve_NilReceiver(b *testing.B) { + // Common case: bucket has no lifecycle config → resolver is nil. + var r *LifecycleTTLResolver + b.ReportAllocs() + for i := 0; i < b.N; i++ { + _ = r.Resolve("logs/foo.txt", 4096) + } +} + +func BenchmarkLifecycleTTLResolver_Resolve_OneRule(b *testing.B) { + // Typical case: one Expiration.Days rule that the key matches. + r := NewLifecycleTTLResolver([]*s3lifecycle.Rule{ + enabledRule("logs/", 7), + }, false) + b.ReportAllocs() + for i := 0; i < b.N; i++ { + _ = r.Resolve("logs/foo.txt", 4096) + } +} + +func BenchmarkLifecycleTTLResolver_Resolve_FiveRulesNoMatch(b *testing.B) { + // Worst typical case: walks all rules and none match. + r := NewLifecycleTTLResolver([]*s3lifecycle.Rule{ + enabledRule("a/", 1), + enabledRule("b/", 7), + enabledRule("c/", 30), + enabledRule("d/", 90), + enabledRule("e/", 365), + }, false) + b.ReportAllocs() + for i := 0; i < b.N; i++ { + _ = r.Resolve("z/foo.txt", 4096) + } +} + +func TestPopulateBucketConfigDerivedFields_RefreshesLifecycleTTL(t *testing.T) { + // Regression: storeBucketLifecycleConfiguration only updates + // Entry.Extended; if the cache-refresh path doesn't re-derive + // LifecycleTTL, an Add → Update → Delete dance would leave a stale + // resolver applying the wrong volume TTL to subsequent writes. Walk + // the three transitions and assert the resolver follows. + s := &S3ApiServer{} + + xmlAdd := []byte(`rEnabledlogs/7`) + xmlReplace := []byte(`rEnabledlogs/30`) + + cfg := &BucketConfig{Name: "bk", Entry: &filer_pb.Entry{Extended: map[string][]byte{}}} + + // 1) No XML yet → no resolver. + s.populateBucketConfigDerivedFields(cfg) + if cfg.LifecycleTTL != nil { + t.Fatalf("no XML must yield nil resolver, got %v", cfg.LifecycleTTL) + } + + // 2) Add: 7d. + cfg.Entry.Extended[bucketLifecycleConfigurationXMLKey] = xmlAdd + s.populateBucketConfigDerivedFields(cfg) + if got := cfg.LifecycleTTL.Resolve("logs/foo", 1); got != 7*86400 { + t.Fatalf("after add, want 7d, got %d", got) + } + + // 3) Replace: 30d. The previous resolver must NOT linger. + cfg.Entry.Extended[bucketLifecycleConfigurationXMLKey] = xmlReplace + s.populateBucketConfigDerivedFields(cfg) + if got := cfg.LifecycleTTL.Resolve("logs/foo", 1); got != 30*86400 { + t.Fatalf("after replace, want 30d, got %d (stale resolver?)", got) + } + + // 4) Delete: nil resolver. The most dangerous regression — leaving + // the old resolver here would keep stamping irreversible volume + // TTL onto writes after the policy was removed. + delete(cfg.Entry.Extended, bucketLifecycleConfigurationXMLKey) + s.populateBucketConfigDerivedFields(cfg) + if cfg.LifecycleTTL != nil { + t.Fatalf("after delete, want nil resolver, got %v", cfg.LifecycleTTL) + } +} + +func TestPopulateBucketConfigDerivedFields_ObjectLockTreatedAsVersioned(t *testing.T) { + // Object Lock requires versioning, so a bucket with ObjectLock but + // no explicit Versioning header is still effectively versioned — + // volume TTL would expire all noncurrent versions as a unit. The + // resolver-construction site must mirror BucketIsVersioned and + // treat ObjectLockConfig != nil as versioned. + s := &S3ApiServer{} + xml := []byte(`rEnabledlogs/7`) + cfg := &BucketConfig{ + Name: "bk", + Entry: &filer_pb.Entry{Extended: map[string][]byte{ + s3_constants.ExtObjectLockEnabledKey: []byte(s3_constants.ObjectLockEnabled), + bucketLifecycleConfigurationXMLKey: xml, + }}, + } + s.populateBucketConfigDerivedFields(cfg) + if cfg.ObjectLockConfig == nil { + t.Fatal("test setup: ObjectLockConfig should be parsed") + } + if cfg.LifecycleTTL != nil { + t.Fatalf("ObjectLock buckets must skip the fast-path resolver, got %v", cfg.LifecycleTTL) + } +}