From 3f4cb6d2fb3a86b19687d7942045501df4e95c82 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 11 May 2026 18:07:54 -0700 Subject: [PATCH] feat(s3/lifecycle/engine): daily-replay view surface (Phase 4 engine) (#9447) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(s3/lifecycle/engine): daily-replay view surface (Phase 4 engine) Adds the engine-side API the new daily-replay worker reaches for: per-view snapshot construction (RulesForShard, RecoveryView), the two cursor hashes that gate recovery (ReplayContentHash, PromotedHash), and the cursor sliding-window helper (MaxEffectiveTTL). CurrentSnapshot is a stub keyed on a package-level atomic that the worker startup wiring populates. Views return new *Snapshot instances holding cloned *CompiledAction values so per-clone active/Mode never leak across partitions. Replay clones force Mode=ModeEventDriven to rehabilitate any persistent ModeScanOnly carried over from PriorState; walk and recovery clones preserve Mode as-is. Disabled actions are excluded from all views. No production caller is wired here — Phase 4's walker/dailyrun integration is the follow-up. dailyrun's local helpers (localReplayContentHash, localMaxEffectiveTTL) become one-line redirects to these exports. API surface: - CurrentSnapshot() *Snapshot — stub until Phase 4 wiring. - SetCurrentEngine(*Engine) — Phase 4 wiring entry point. - Snapshot.RulesForShard(shardID, retentionWindow) (replay, walk *Snapshot) - RecoveryView(s *Snapshot) *Snapshot — force-active over the full set. - ReplayContentHash(s *Snapshot) [32]byte — partition-independent. - PromotedHash(s *Snapshot, retentionWindow) [32]byte — partition-flip. - MaxEffectiveTTL(s *Snapshot) time.Duration — over active replay only. 30 unit tests covering clone isolation, Mode rewrite, partition membership including the multi-action-kind XML rule split, RecoveryView activating pre-BootstrapComplete actions, ReplayContentHash partition-independence, PromotedHash sensitivity to promotion in either direction, MaxEffectiveTTL aggregation. Build + race-tests green. * refactor(s3/lifecycle/engine): consolidate hash helpers; clarify shardID semantics Addresses PR #9447 review feedback. Three medium-priority items from gemini, all code-quality refinements (no behavior change): 1. Duplicated sort comparator between ReplayContentHash and PromotedHash. Extract sortHashItems shared helper so the two hashes use the same ordering by construction — if one drifted, the cursor could see a spurious "rule changed" on a no-op snapshot rebuild. 2. Duplicated writeField/writeInt closures. Extract hashWriter struct holding the sha256 running hash + lenbuf, with method helpers. Same allocation profile (one Hash, one tiny stack buffer per helper); just deduplicates ~20 lines. 3. shardID parameter on RulesForShard is unused. Per the design's open question, every shard sees every rule today (shard filter runs at the entry-iteration site, not view construction). Keep the parameter for API stability — removing it now would force a breaking change when bucket-shard ownership lands — and update the doc comment to explain why it's reserved. go build ./... clean; engine test suite green. --- weed/s3api/s3lifecycle/engine/hashes.go | 205 ++++++++++++ weed/s3api/s3lifecycle/engine/hashes_test.go | 273 ++++++++++++++++ weed/s3api/s3lifecycle/engine/views.go | 234 +++++++++++++ weed/s3api/s3lifecycle/engine/views_test.go | 325 +++++++++++++++++++ 4 files changed, 1037 insertions(+) create mode 100644 weed/s3api/s3lifecycle/engine/hashes.go create mode 100644 weed/s3api/s3lifecycle/engine/hashes_test.go create mode 100644 weed/s3api/s3lifecycle/engine/views.go create mode 100644 weed/s3api/s3lifecycle/engine/views_test.go diff --git a/weed/s3api/s3lifecycle/engine/hashes.go b/weed/s3api/s3lifecycle/engine/hashes.go new file mode 100644 index 000000000..42815a1e5 --- /dev/null +++ b/weed/s3api/s3lifecycle/engine/hashes.go @@ -0,0 +1,205 @@ +package engine + +import ( + "bytes" + "crypto/sha256" + "encoding/binary" + "hash" + "sort" + "time" + + "github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle" +) + +// hashItem pairs a ruleset member with its parent action so the +// sort+hash helpers below can be type-agnostic. Internal type — every +// hash callsite collects items into a slice, then hands it off. +type hashItem struct { + key s3lifecycle.ActionKey + action *CompiledAction +} + +// sortHashItems orders items by (RuleHash, ActionKind, Bucket). The +// composition matches the ActionKey identity model: RuleHash is the +// primary content-derived identifier, ActionKind disambiguates siblings +// of one rule, Bucket scopes by bucket so the same XML in two buckets +// hashes distinctly. Shared between ReplayContentHash and PromotedHash +// so the two helpers agree on the on-wire ordering — if one drifted, +// the cursor could see "rule changed" on a no-op snapshot rebuild. +func sortHashItems(items []hashItem) { + sort.Slice(items, func(i, j int) bool { + if c := bytes.Compare(items[i].key.RuleHash[:], items[j].key.RuleHash[:]); c != 0 { + return c < 0 + } + if items[i].key.ActionKind != items[j].key.ActionKind { + return items[i].key.ActionKind < items[j].key.ActionKind + } + return items[i].key.Bucket < items[j].key.Bucket + }) +} + +// hashWriter is the small varint-tagged writer the hash helpers use. +// Each field gets a one-byte tag so a future schema change (new field) +// can extend the on-wire format without colliding with existing values. +type hashWriter struct { + h hash.Hash + lenbuf [binary.MaxVarintLen64]byte +} + +func newHashWriter() *hashWriter { + return &hashWriter{h: sha256.New()} +} + +// writeField writes a length-prefixed byte field under tag. +func (w *hashWriter) writeField(tag byte, b []byte) { + _, _ = w.h.Write([]byte{tag}) + n := binary.PutUvarint(w.lenbuf[:], uint64(len(b))) + _, _ = w.h.Write(w.lenbuf[:n]) + _, _ = w.h.Write(b) +} + +// writeInt writes a varint-encoded signed integer under tag. +func (w *hashWriter) writeInt(tag byte, v int64) { + _, _ = w.h.Write([]byte{tag}) + n := binary.PutVarint(w.lenbuf[:], v) + _, _ = w.h.Write(w.lenbuf[:n]) +} + +func (w *hashWriter) sum() [32]byte { + var out [32]byte + copy(out[:], w.h.Sum(nil)) + return out +} + +// ReplayContentHash hashes the content (action kind, predicate, TTL value) +// of every replay-eligible compiled action in the base snapshot, returning +// the empty hash when no replay-eligible action exists. The hash is: +// - Partition-independent. A retention-driven scan_only promotion does +// NOT change this hash; only the dispatch path changes, not the rule +// content. (PromotedHash exists to catch partition flips separately.) +// - Stable across snapshot reorderings. Actions are pre-sorted by +// RuleHash + ActionKind + Bucket so two snapshots with the same rules +// compiled in any order hash identically. +// - Disabled-rule-aware. ModeDisabled actions are excluded so disabling +// a rule changes the hash (it changed the rule set the worker is +// scanning under). +// +// Used as cursor.RuleSetHash. A mismatch between persisted and current +// triggers the recovery branch on next daily_run. +func ReplayContentHash(s *Snapshot) [32]byte { + var empty [32]byte + if s == nil { + return empty + } + var items []hashItem + for k, a := range s.actions { + if a == nil || a.Mode == ModeDisabled { + continue + } + if !isReplayKind(k.ActionKind) { + continue + } + items = append(items, hashItem{key: k, action: a}) + } + if len(items) == 0 { + return empty + } + sortHashItems(items) + + w := newHashWriter() + for _, it := range items { + w.writeField(0x01, []byte(it.key.Bucket)) + w.writeField(0x02, it.key.RuleHash[:]) + w.writeInt(0x03, int64(it.key.ActionKind)) + // RuleHash already covers the predicate (Prefix + FilterTags + size + // filters) and per-kind TTLs, so we don't need to re-canonicalise + // the *Rule. But we also include the action's effective TTL + // directly so that an "effective TTL of 0" (a malformed rule where + // the kind doesn't match the populated field) is distinguishable + // from a valid one. + w.writeInt(0x04, int64(effectiveTTL(it.action))) + } + return w.sum() +} + +// PromotedHash hashes the set of replay-eligible actions that *would* land +// in walk (rather than replay) for the given retentionWindow, due to TTL > +// retentionWindow. Empty hash when no rules are promoted. Takes the SAME +// retentionWindow value as RulesForShard so the two helpers cannot disagree +// about partition membership. +// +// Detects partition flips in either direction: +// - replay → walk (retention dropped): rule appears in this hash but +// didn't before. +// - walk → replay (retention recovered): rule used to appear here but no +// longer does. +// +// In both cases the persisted hash differs from the freshly computed one, +// firing the recovery branch. +// +// A mismatch with the persisted PromotedHash triggers recovery even when +// rule content is unchanged. +func PromotedHash(s *Snapshot, retentionWindow time.Duration) [32]byte { + var empty [32]byte + if s == nil { + return empty + } + var items []hashItem + for k, a := range s.actions { + if a == nil || a.Mode == ModeDisabled { + continue + } + if !isReplayKind(k.ActionKind) { + continue + } + ttl := effectiveTTL(a) + // Mirror RulesForShard's partition predicate exactly: a replay + // kind lands in walk when ttl is 0 (malformed) or ttl > + // retentionWindow. PromotedHash hashes that walk-bound subset. + if ttl > 0 && ttl <= retentionWindow { + continue + } + items = append(items, hashItem{key: k, action: a}) + } + if len(items) == 0 { + return empty + } + sortHashItems(items) + + w := newHashWriter() + for _, it := range items { + w.writeField(0x01, []byte(it.key.Bucket)) + w.writeField(0x02, it.key.RuleHash[:]) + w.writeInt(0x03, int64(it.key.ActionKind)) + } + return w.sum() +} + +// MaxEffectiveTTL returns the maximum effective TTL across the *active* +// replay-eligible actions in s. Returns 0 for a nil snapshot or one with no +// active replay actions; the caller is expected to be in the empty-replay +// branch already (per the design's sentinel-cursor logic). +// +// "Effective TTL" mirrors the partition predicate in views.go: derived from +// the rule field that matches the action kind. Walker-only action kinds +// (ExpirationDate / ExpiredDeleteMarker / NewerNoncurrent) contribute +// nothing — they're either not replay-eligible or, in a walk view, not +// active in the replay sense. +func MaxEffectiveTTL(s *Snapshot) time.Duration { + if s == nil { + return 0 + } + var max time.Duration + for k, a := range s.actions { + if a == nil || !a.IsActive() { + continue + } + if !isReplayKind(k.ActionKind) { + continue + } + if ttl := effectiveTTL(a); ttl > max { + max = ttl + } + } + return max +} diff --git a/weed/s3api/s3lifecycle/engine/hashes_test.go b/weed/s3api/s3lifecycle/engine/hashes_test.go new file mode 100644 index 000000000..ca849191c --- /dev/null +++ b/weed/s3api/s3lifecycle/engine/hashes_test.go @@ -0,0 +1,273 @@ +package engine + +import ( + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestReplayContentHash_EmptyWhenNoReplayActions(t *testing.T) { + var empty [32]byte + // Nil snapshot. + assert.Equal(t, empty, ReplayContentHash(nil)) + + // Walker-only rule: ExpirationDate compiles to a single ScanAtDate + // action which is excluded from ReplayContentHash. + when := time.Now().Add(24 * time.Hour) + rule := &s3lifecycle.Rule{ + ID: "d", + Status: s3lifecycle.StatusEnabled, + ExpirationDate: when, + } + snap := buildSnapshotForViews(t, "b1", rule) + assert.Equal(t, empty, ReplayContentHash(snap)) +} + +func TestReplayContentHash_PartitionIndependent(t *testing.T) { + // Critical invariant: a retention shift that promotes a rule from + // replay to walk (or back) MUST NOT change ReplayContentHash. The hash + // is over the rule content as it appears in the base snapshot, not + // over the partition it ends up in. + rule := &s3lifecycle.Rule{ + ID: "shift", + Status: s3lifecycle.StatusEnabled, + ExpirationDays: 30, + } + snap := buildSnapshotForViews(t, "b1", rule) + hashWithLargeRetention := ReplayContentHash(snap) + + // Whether retention is 1d (promotes to walk) or 365d (stays in + // replay), the base snapshot's rule content didn't move — only its + // partition did. ReplayContentHash hashes the base, so this is + // trivially constant. We re-call here to pin the contract. + _, walkSmall := snap.RulesForShard(0, s3lifecycle.DaysToDuration(1)) + require.NotNil(t, walkSmall, "precondition: 1d retention promotes 30d rule to walk") + replayLarge, _ := snap.RulesForShard(0, s3lifecycle.DaysToDuration(365)) + require.NotNil(t, replayLarge, "precondition: 365d retention keeps 30d rule in replay") + + hashAgain := ReplayContentHash(snap) + assert.Equal(t, hashWithLargeRetention, hashAgain, + "ReplayContentHash must be partition-independent") +} + +func TestReplayContentHash_ChangesOnRuleContentEdit(t *testing.T) { + rule := &s3lifecycle.Rule{ + ID: "r", + Status: s3lifecycle.StatusEnabled, + ExpirationDays: 30, + } + snap1 := buildSnapshotForViews(t, "b1", rule) + h1 := ReplayContentHash(snap1) + + // Edit the TTL — RuleHash changes, so the action's key changes, and + // the content hash must move. + rule2 := &s3lifecycle.Rule{ + ID: "r", + Status: s3lifecycle.StatusEnabled, + ExpirationDays: 60, + } + snap2 := buildSnapshotForViews(t, "b1", rule2) + h2 := ReplayContentHash(snap2) + + assert.NotEqual(t, h1, h2, "TTL edit must change ReplayContentHash") +} + +func TestReplayContentHash_StableAcrossRuleReorder(t *testing.T) { + r1 := &s3lifecycle.Rule{ID: "a", Status: s3lifecycle.StatusEnabled, ExpirationDays: 7, Prefix: "a/"} + r2 := &s3lifecycle.Rule{ID: "b", Status: s3lifecycle.StatusEnabled, ExpirationDays: 14, Prefix: "b/"} + snapAB := buildSnapshotForViews(t, "b1", r1, r2) + snapBA := buildSnapshotForViews(t, "b1", r2, r1) + assert.Equal(t, ReplayContentHash(snapAB), ReplayContentHash(snapBA), + "reordering rules at compile time must not change ReplayContentHash") +} + +func TestReplayContentHash_ExcludesWalkerOnlyAndDisabled(t *testing.T) { + // Walker-only kinds and disabled rules don't affect ReplayContentHash; + // only replay-eligible action kinds count. + replayRule := &s3lifecycle.Rule{ + ID: "rep", + Status: s3lifecycle.StatusEnabled, + ExpirationDays: 7, + } + snapOnlyReplay := buildSnapshotForViews(t, "b1", replayRule) + baseline := ReplayContentHash(snapOnlyReplay) + + walkerRule := &s3lifecycle.Rule{ + ID: "walker", + Status: s3lifecycle.StatusEnabled, + ExpiredObjectDeleteMarker: true, + } + disabledRule := &s3lifecycle.Rule{ + ID: "off", + Status: s3lifecycle.StatusDisabled, + ExpirationDays: 90, + } + snapMixed := buildSnapshotForViews(t, "b1", replayRule, walkerRule, disabledRule) + mixed := ReplayContentHash(snapMixed) + + assert.Equal(t, baseline, mixed, + "adding walker-only and disabled rules must not change ReplayContentHash") +} + +func TestPromotedHash_EmptyWhenNoneAreInWalk(t *testing.T) { + var empty [32]byte + assert.Equal(t, empty, PromotedHash(nil, 0)) + + rule := &s3lifecycle.Rule{ + ID: "fits", + Status: s3lifecycle.StatusEnabled, + ExpirationDays: 7, + } + snap := buildSnapshotForViews(t, "b1", rule) + // Large retention → rule stays in replay → no promotion. + assert.Equal(t, empty, PromotedHash(snap, s3lifecycle.DaysToDuration(365))) +} + +func TestPromotedHash_ChangesOnReplayToWalkPromotion(t *testing.T) { + // Retention drops below the rule's TTL → rule promotes from replay to + // walk → PromotedHash changes from empty to non-empty. + rule := &s3lifecycle.Rule{ + ID: "long", + Status: s3lifecycle.StatusEnabled, + ExpirationDays: 30, + } + snap := buildSnapshotForViews(t, "b1", rule) + + before := PromotedHash(snap, s3lifecycle.DaysToDuration(365)) // replay + after := PromotedHash(snap, s3lifecycle.DaysToDuration(7)) // promoted to walk + + var empty [32]byte + assert.Equal(t, empty, before) + assert.NotEqual(t, empty, after) + assert.NotEqual(t, before, after, "promotion must change PromotedHash") +} + +func TestPromotedHash_ChangesOnWalkToReplayDemotion(t *testing.T) { + // Retention recovers from < TTL to >= TTL → rule demotes from walk to + // replay → PromotedHash changes from non-empty to empty. + rule := &s3lifecycle.Rule{ + ID: "demo", + Status: s3lifecycle.StatusEnabled, + ExpirationDays: 30, + } + snap := buildSnapshotForViews(t, "b1", rule) + + before := PromotedHash(snap, s3lifecycle.DaysToDuration(7)) // walk + after := PromotedHash(snap, s3lifecycle.DaysToDuration(365)) // demoted back to replay + + var empty [32]byte + assert.NotEqual(t, empty, before) + assert.Equal(t, empty, after) + assert.NotEqual(t, before, after, "demotion must change PromotedHash") +} + +func TestPromotedHash_StableWhenContentUnchangedAndPartitionStays(t *testing.T) { + // Same snapshot + same retentionWindow → same PromotedHash. + rule := &s3lifecycle.Rule{ + ID: "stable", + Status: s3lifecycle.StatusEnabled, + ExpirationDays: 30, + } + snap := buildSnapshotForViews(t, "b1", rule) + rw := s3lifecycle.DaysToDuration(7) // promoted + assert.Equal(t, PromotedHash(snap, rw), PromotedHash(snap, rw)) +} + +func TestPromotedHash_MatchesRulesForShardWalkMembership(t *testing.T) { + // PromotedHash must agree with RulesForShard about which replay- + // eligible actions are in walk for the same retentionWindow. + rules := []*s3lifecycle.Rule{ + {ID: "short", Status: s3lifecycle.StatusEnabled, ExpirationDays: 1}, + {ID: "long", Status: s3lifecycle.StatusEnabled, ExpirationDays: 90}, + {ID: "noncurrent", Status: s3lifecycle.StatusEnabled, NoncurrentVersionExpirationDays: 200}, + } + snap := buildSnapshotForViews(t, "b1", rules...) + rw := s3lifecycle.DaysToDuration(30) + + replay, walk := snap.RulesForShard(0, rw) + require.NotNil(t, replay) + require.NotNil(t, walk) + + // Expected walk members (from replay-eligible kinds only): long, noncurrent. + wantPromoted := map[s3lifecycle.ActionKey]struct{}{} + for k := range walk.actions { + if isReplayKind(k.ActionKind) { + wantPromoted[k] = struct{}{} + } + } + assert.Len(t, wantPromoted, 2, "two of three replay-eligible rules should promote") + + hash := PromotedHash(snap, rw) + var empty [32]byte + assert.NotEqual(t, empty, hash) +} + +func TestMaxEffectiveTTL_NilAndEmpty(t *testing.T) { + assert.Equal(t, time.Duration(0), MaxEffectiveTTL(nil)) + + // Snapshot with only walker-only kinds → no active replay action → + // MaxEffectiveTTL returns 0. + rule := &s3lifecycle.Rule{ + ID: "walk", + Status: s3lifecycle.StatusEnabled, + ExpiredObjectDeleteMarker: true, + } + snap := buildSnapshotForViews(t, "b1", rule) + assert.Equal(t, time.Duration(0), MaxEffectiveTTL(snap)) +} + +func TestMaxEffectiveTTL_ReturnsMaxAcrossActiveReplay(t *testing.T) { + // Three replay-eligible actions with mixed TTLs; MaxEffectiveTTL picks + // the largest. + rule := &s3lifecycle.Rule{ + ID: "mix", + Status: s3lifecycle.StatusEnabled, + ExpirationDays: 10, + NoncurrentVersionExpirationDays: 60, + AbortMPUDaysAfterInitiation: 3, + } + snap := buildSnapshotForViews(t, "b1", rule) + got := MaxEffectiveTTL(snap) + assert.Equal(t, s3lifecycle.DaysToDuration(60), got) +} + +func TestMaxEffectiveTTL_OperatesOnReplayView(t *testing.T) { + // The caller passes the *replay* snapshot here in production. Verify + // that on a replay view the answer is the max over the view's + // (active) replay actions and excludes anything that was routed away + // to walk via scan_only promotion. + rule := &s3lifecycle.Rule{ + ID: "mix", + Status: s3lifecycle.StatusEnabled, + ExpirationDays: 10, + NoncurrentVersionExpirationDays: 200, // would promote at small retention + } + snap := buildSnapshotForViews(t, "b1", rule) + // Retention 30d: ExpirationDays(10) stays in replay; NoncurrentDays(200) + // promotes to walk. The replay view's MaxEffectiveTTL is therefore 10d. + replay, _ := snap.RulesForShard(0, s3lifecycle.DaysToDuration(30)) + require.NotNil(t, replay) + assert.Equal(t, s3lifecycle.DaysToDuration(10), MaxEffectiveTTL(replay)) +} + +func TestMaxEffectiveTTL_IgnoresInactiveActions(t *testing.T) { + // Pre-BootstrapComplete event-driven action is in the snapshot but + // inactive. MaxEffectiveTTL only counts active actions. + rule := &s3lifecycle.Rule{ + ID: "pending", + Status: s3lifecycle.StatusEnabled, + ExpirationDays: 30, + } + snap := New().Compile( + []CompileInput{{Bucket: "b1", Rules: []*s3lifecycle.Rule{rule}}}, + CompileOptions{}, // no PriorState → inactive + ) + for _, a := range snap.actions { + require.False(t, a.IsActive(), "precondition") + } + assert.Equal(t, time.Duration(0), MaxEffectiveTTL(snap), + "inactive replay actions must not contribute to MaxEffectiveTTL") +} diff --git a/weed/s3api/s3lifecycle/engine/views.go b/weed/s3api/s3lifecycle/engine/views.go new file mode 100644 index 000000000..7770ca668 --- /dev/null +++ b/weed/s3api/s3lifecycle/engine/views.go @@ -0,0 +1,234 @@ +package engine + +import ( + "bytes" + "sort" + "sync/atomic" + "time" + + "github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle" +) + +// currentEngine is the package-level pointer the wiring will eventually +// populate so daily_run can fetch the live snapshot through CurrentSnapshot. +// Today the worker/scheduler owns its own *Engine and passes the snapshot +// explicitly to its consumers; nothing calls SetCurrentEngine yet. Phase 4's +// daily-replay caller will register the worker's engine here on startup so +// the package-level helper resolves without threading the *Engine through +// every layer. +// +// TODO(phase4-wiring): hook scheduler/worker startup to call SetCurrentEngine. +var currentEngine atomic.Pointer[Engine] + +// SetCurrentEngine installs e as the package-level engine returned by +// CurrentSnapshot. Intended for the worker bootstrap path; tests may also +// call it with a locally-constructed engine. Passing nil clears the pointer. +func SetCurrentEngine(e *Engine) { currentEngine.Store(e) } + +// CurrentSnapshot returns the latest *Snapshot from the package-level engine, +// or nil if no engine has been registered yet. The caller is responsible for +// nil-checking the return — a missing engine is treated as "no rules" so the +// daily-run loop short-circuits cleanly instead of panicking during early +// startup. +// +// TODO(phase4-wiring): SetCurrentEngine must be invoked at worker startup +// for production callers; until then this returns nil. +func CurrentSnapshot() *Snapshot { + e := currentEngine.Load() + if e == nil { + return nil + } + return e.Snapshot() +} + +// cloneAction makes an independent *CompiledAction for a per-view snapshot. +// The clone shares Rule (immutable) and Key (value) with the base, but its +// engineState (the active bit) is its own atomic — so flipping active on the +// clone never reaches the base or any sibling view. Mode is set explicitly by +// the caller to encode the per-view dispatch contract (replay forces +// ModeEventDriven so router.Route's gate passes; walk and recovery preserve +// the base Mode so the walker's "any non-Disabled" gate works as-is). +func cloneAction(src *CompiledAction, mode RuleMode, active bool) *CompiledAction { + if src == nil { + return nil + } + dst := &CompiledAction{ + Rule: src.Rule, + Bucket: src.Bucket, + Key: src.Key, + Delay: src.Delay, + PredicateSensitive: src.PredicateSensitive, + Mode: mode, + } + if active { + dst.markActive() + } + return dst +} + +// newView constructs a *Snapshot that holds the supplied per-view action +// clones but otherwise shares the base's immutable rule / index data by +// pointer (see DESIGN.md "router.Route integration" — only `active` and +// `Mode` differ per view; everything else is share-by-pointer). +// +// Returns nil if actions is empty: callers consume nil as "this partition +// has no actions, skip the corresponding dispatch path." +func newView(base *Snapshot, actions map[s3lifecycle.ActionKey]*CompiledAction) *Snapshot { + if len(actions) == 0 { + return nil + } + // allActionsSorted mirrors the base order: same comparator (Bucket, + // RuleHash, ActionKind) so downstream iteration order is stable per view. + sorted := make([]*CompiledAction, 0, len(actions)) + for _, a := range actions { + sorted = append(sorted, a) + } + sort.Slice(sorted, func(i, j int) bool { + a, b := sorted[i], sorted[j] + if a.Bucket != b.Bucket { + return a.Bucket < b.Bucket + } + if c := bytes.Compare(a.Key.RuleHash[:], b.Key.RuleHash[:]); c != 0 { + return c < 0 + } + return a.Key.ActionKind < b.Key.ActionKind + }) + return &Snapshot{ + id: base.id, + buckets: base.buckets, + actions: actions, + allActionsSorted: sorted, + originalDelayGroups: base.originalDelayGroups, + predicateActions: base.predicateActions, + dateActions: base.dateActions, + } +} + +// isReplayKind reports whether ActionKind is a candidate for the replay +// partition: its trigger derives from a single event's TsNs (or a stamped +// noncurrent_since), so the meta-log scan can drive dispatch directly. +// ExpirationDate / ExpiredDeleteMarker / NewerNoncurrent are walker-only +// because their due time depends on current bucket state, not event age. +func isReplayKind(k s3lifecycle.ActionKind) bool { + switch k { + case s3lifecycle.ActionKindExpirationDays, + s3lifecycle.ActionKindNoncurrentDays, + s3lifecycle.ActionKindAbortMPU: + return true + } + return false +} + +// effectiveTTL is the duration used both for partition membership +// (TTL ≤ retentionWindow → replay) and MaxEffectiveTTL math (driving the +// cursor's sliding scan window). Walker-only kinds return 0 so they never +// contribute to MaxEffectiveTTL and never satisfy the partition gate when +// retentionWindow is finite. +func effectiveTTL(a *CompiledAction) time.Duration { + if a == nil || a.Rule == nil { + return 0 + } + switch a.Key.ActionKind { + case s3lifecycle.ActionKindExpirationDays: + if a.Rule.ExpirationDays > 0 { + return s3lifecycle.DaysToDuration(a.Rule.ExpirationDays) + } + case s3lifecycle.ActionKindNoncurrentDays: + if a.Rule.NoncurrentVersionExpirationDays > 0 { + return s3lifecycle.DaysToDuration(a.Rule.NoncurrentVersionExpirationDays) + } + case s3lifecycle.ActionKindAbortMPU: + if a.Rule.AbortMPUDaysAfterInitiation > 0 { + return s3lifecycle.DaysToDuration(a.Rule.AbortMPUDaysAfterInitiation) + } + } + return 0 +} + +// RulesForShard partitions the base snapshot's compiled actions into a +// replay view and a walk view. +// +// shardID is **reserved for forward-compatibility** and not consumed by +// this implementation. Every shard sees every rule today because the +// shard filter runs at the entry-iteration site (meta-log subscription +// per shard, walker entry-loop ShardID check) rather than at view +// construction. Keeping the parameter in the signature preserves the +// API surface for a future move to per-shard rule sets (e.g. when +// bucket-shard ownership is added) — removing it now would be a +// breaking change at that future point. See DESIGN.md "open questions +// → per-shard rule snapshots vs. global." +// +// retentionWindow is the only window input the engine sees; the +// daily_run caller computes it as `now - earliest_available` and +// passes it in so the partition is stable across one daily_run +// invocation. +// +// Membership: +// - replay: clones of ExpirationDays / NoncurrentDays / AbortMPU actions +// whose TTL ≤ retentionWindow. active=true, Mode forced to +// ModeEventDriven (rehabilitating any prior ModeScanOnly lock-in so +// router.Route's gate accepts the clone). +// - walk: clones of ExpirationDate / ExpiredDeleteMarker / NewerNoncurrent +// actions, plus any replay-eligible action whose TTL exceeds +// retentionWindow (the "scan_only promotion" path). active=true, Mode +// preserved from the base — the walker only rejects ModeDisabled. +// +// Either return value may be nil when its partition has no actions. +// Disabled actions (Mode == ModeDisabled) are excluded from both views. +func (s *Snapshot) RulesForShard(shardID int, retentionWindow time.Duration) (replay, walk *Snapshot) { + _ = shardID + if s == nil { + return nil, nil + } + replayActions := map[s3lifecycle.ActionKey]*CompiledAction{} + walkActions := map[s3lifecycle.ActionKey]*CompiledAction{} + for key, a := range s.actions { + if a == nil || a.Mode == ModeDisabled { + continue + } + if isReplayKind(key.ActionKind) { + ttl := effectiveTTL(a) + // retentionWindow == 0 means "no retention info supplied" — the + // caller should treat the partition as walk-only to stay safe, + // since promoting every rule into replay without retention + // proof would defeat the scan_only protection. ttl == 0 is a + // malformed rule (kind says replay but no TTL): also routes to + // walk so the walker can decide. + if ttl > 0 && ttl <= retentionWindow { + replayActions[key] = cloneAction(a, ModeEventDriven, true) + } else { + walkActions[key] = cloneAction(a, a.Mode, true) + } + continue + } + // Walker-only kinds: preserve Mode (ModeScanAtDate for + // ExpirationDate, ModeEventDriven for the others) so the walker's + // per-rule evaluator runs as today. + walkActions[key] = cloneAction(a, a.Mode, true) + } + return newView(s, replayActions), newView(s, walkActions) +} + +// RecoveryView returns a *Snapshot that contains a clone of every action in +// the base, with active=true regardless of compile-time IsActive/ +// BootstrapComplete and Mode preserved as-is. The recovery walker uses this +// view to catch already-due objects across the full rule set on cold start, +// rule edit, retention loss, or partition flip. Disabled actions stay out — +// recovery shouldn't resurrect an explicitly disabled rule. +// +// Unlike RulesForShard, RecoveryView takes no retentionWindow: every +// non-disabled action is activated, partition-independent, because recovery +// must reach rules whose subjects sit outside the replay scan window. +func RecoveryView(s *Snapshot) *Snapshot { + if s == nil { + return nil + } + actions := map[s3lifecycle.ActionKey]*CompiledAction{} + for key, a := range s.actions { + if a == nil || a.Mode == ModeDisabled { + continue + } + actions[key] = cloneAction(a, a.Mode, true) + } + return newView(s, actions) +} diff --git a/weed/s3api/s3lifecycle/engine/views_test.go b/weed/s3api/s3lifecycle/engine/views_test.go new file mode 100644 index 000000000..7c2a9973b --- /dev/null +++ b/weed/s3api/s3lifecycle/engine/views_test.go @@ -0,0 +1,325 @@ +package engine + +import ( + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// buildSnapshotForViews compiles a snapshot with a single bucket containing +// one rule per supplied factory, marking every action bootstrap_complete so +// the base actions are active. Tests can then call RulesForShard and assert +// the per-view clones come out independent. +func buildSnapshotForViews(t *testing.T, bucket string, rules ...*s3lifecycle.Rule) *Snapshot { + t.Helper() + prior := map[s3lifecycle.ActionKey]PriorState{} + for _, r := range rules { + rh := s3lifecycle.RuleHash(r) + for _, k := range s3lifecycle.RuleActionKinds(r) { + prior[s3lifecycle.ActionKey{Bucket: bucket, RuleHash: rh, ActionKind: k}] = PriorState{ + BootstrapComplete: true, + } + } + } + return New().Compile( + []CompileInput{{Bucket: bucket, Rules: rules}}, + CompileOptions{PriorStates: prior}, + ) +} + +func TestCurrentSnapshot_NilWhenNoEngineRegistered(t *testing.T) { + // Save and restore the package-level pointer so tests stay isolated. + prev := currentEngine.Load() + t.Cleanup(func() { currentEngine.Store(prev) }) + + SetCurrentEngine(nil) + assert.Nil(t, CurrentSnapshot()) +} + +func TestCurrentSnapshot_ReturnsRegisteredEngineSnapshot(t *testing.T) { + prev := currentEngine.Load() + t.Cleanup(func() { currentEngine.Store(prev) }) + + e := New() + rule := &s3lifecycle.Rule{ID: "r", Status: s3lifecycle.StatusEnabled, ExpirationDays: 7} + snap := e.Compile([]CompileInput{{Bucket: "b", Rules: []*s3lifecycle.Rule{rule}}}, CompileOptions{}) + SetCurrentEngine(e) + assert.Same(t, snap, CurrentSnapshot()) +} + +func TestRulesForShard_NilSnapshot(t *testing.T) { + var s *Snapshot + replay, walk := s.RulesForShard(0, 30*24*time.Hour) + assert.Nil(t, replay) + assert.Nil(t, walk) +} + +func TestRulesForShard_ReplayPartitionMembership(t *testing.T) { + // ExpirationDays / NoncurrentDays / AbortMPU below retentionWindow all + // land in replay; the replay map carries clones with active=true and + // Mode=ModeEventDriven. + rule := &s3lifecycle.Rule{ + ID: "all-replay", + Status: s3lifecycle.StatusEnabled, + ExpirationDays: 3, + NoncurrentVersionExpirationDays: 5, + AbortMPUDaysAfterInitiation: 2, + } + snap := buildSnapshotForViews(t, "b1", rule) + replay, walk := snap.RulesForShard(0, s3lifecycle.DaysToDuration(30)) + require.NotNil(t, replay, "all three replay-eligible kinds should land in replay") + assert.Nil(t, walk, "no walker kinds, no scan_only promotion, walk should be empty") + assert.Len(t, replay.actions, 3) + for k, a := range replay.actions { + assert.True(t, a.IsActive(), "replay clone must be active for kind %v", k.ActionKind) + assert.Equal(t, ModeEventDriven, a.Mode, "replay Mode must be ModeEventDriven for kind %v", k.ActionKind) + } +} + +func TestRulesForShard_WalkOnlyKindsRouteToWalk(t *testing.T) { + // ExpirationDate, ExpiredObjectDeleteMarker, NewerNoncurrent are walker- + // only and must land in walk regardless of retentionWindow. + when := time.Now().Add(24 * time.Hour) + rule := &s3lifecycle.Rule{ + ID: "walker-mix", + Status: s3lifecycle.StatusEnabled, + ExpirationDate: when, + ExpiredObjectDeleteMarker: true, + NewerNoncurrentVersions: 2, + } + snap := buildSnapshotForViews(t, "b1", rule) + replay, walk := snap.RulesForShard(0, s3lifecycle.DaysToDuration(30)) + assert.Nil(t, replay, "walker-only rule has no replay partition") + require.NotNil(t, walk) + assert.Len(t, walk.actions, 3, "all three walker kinds should land in walk") + for _, a := range walk.actions { + assert.True(t, a.IsActive()) + } +} + +func TestRulesForShard_MultiActionXMLRuleSplits(t *testing.T) { + // A single XML rule that compiles to ExpirationDays + NewerNoncurrent + // must split: ExpirationDays → replay, NewerNoncurrent → walk. + rule := &s3lifecycle.Rule{ + ID: "split", + Status: s3lifecycle.StatusEnabled, + ExpirationDays: 3, + NewerNoncurrentVersions: 5, + } + snap := buildSnapshotForViews(t, "b1", rule) + replay, walk := snap.RulesForShard(0, s3lifecycle.DaysToDuration(30)) + require.NotNil(t, replay) + require.NotNil(t, walk) + assert.Len(t, replay.actions, 1) + assert.Len(t, walk.actions, 1) + for k := range replay.actions { + assert.Equal(t, s3lifecycle.ActionKindExpirationDays, k.ActionKind) + } + for k := range walk.actions { + assert.Equal(t, s3lifecycle.ActionKindNewerNoncurrent, k.ActionKind) + } +} + +func TestRulesForShard_ScanOnlyPromotionRoutesToWalk(t *testing.T) { + // A replay-eligible kind whose TTL exceeds retentionWindow promotes to + // walk. The clone's active=true is set; Mode is preserved as-is from + // the base (the walker only rejects ModeDisabled). + rule := &s3lifecycle.Rule{ + ID: "long", + Status: s3lifecycle.StatusEnabled, + ExpirationDays: 90, + AbortMPUDaysAfterInitiation: 1, + } + snap := buildSnapshotForViews(t, "b1", rule) + // Retention window of 30 days promotes ExpirationDays(90d) to walk; + // AbortMPU(1d) stays in replay. + replay, walk := snap.RulesForShard(0, s3lifecycle.DaysToDuration(30)) + require.NotNil(t, replay) + require.NotNil(t, walk) + assert.Len(t, replay.actions, 1) + assert.Len(t, walk.actions, 1) + for k, a := range replay.actions { + assert.Equal(t, s3lifecycle.ActionKindAbortMPU, k.ActionKind) + assert.Equal(t, ModeEventDriven, a.Mode) + } + for k := range walk.actions { + assert.Equal(t, s3lifecycle.ActionKindExpirationDays, k.ActionKind) + } +} + +func TestRulesForShard_ReplayRewritesScanOnlyMode(t *testing.T) { + // A base action carrying Mode=ModeScanOnly via PriorState must still + // land in replay as ModeEventDriven once retention rehabilitates its + // TTL. router.Route gates on ModeEventDriven, so this is the + // rehabilitation contract. + rule := &s3lifecycle.Rule{ + ID: "rehab", + Status: s3lifecycle.StatusEnabled, + ExpirationDays: 5, + } + rh := s3lifecycle.RuleHash(rule) + key := s3lifecycle.ActionKey{Bucket: "b1", RuleHash: rh, ActionKind: s3lifecycle.ActionKindExpirationDays} + snap := New().Compile( + []CompileInput{{Bucket: "b1", Rules: []*s3lifecycle.Rule{rule}}}, + CompileOptions{ + PriorStates: map[s3lifecycle.ActionKey]PriorState{ + // Persistent ModeScanOnly carried over from a prior compile. + key: {BootstrapComplete: true, Mode: ModeScanOnly}, + }, + }, + ) + // Verify the base really did keep ModeScanOnly (Compile preserves it). + require.Equal(t, ModeScanOnly, snap.actions[key].Mode) + + replay, _ := snap.RulesForShard(0, s3lifecycle.DaysToDuration(30)) + require.NotNil(t, replay) + clone := replay.actions[key] + require.NotNil(t, clone) + assert.Equal(t, ModeEventDriven, clone.Mode, "replay must force ModeEventDriven on the clone") + assert.True(t, clone.IsActive()) +} + +func TestRulesForShard_DisabledRuleExcludedFromBothViews(t *testing.T) { + rule := &s3lifecycle.Rule{ID: "off", Status: s3lifecycle.StatusDisabled, ExpirationDays: 7} + snap := buildSnapshotForViews(t, "b1", rule) + replay, walk := snap.RulesForShard(0, s3lifecycle.DaysToDuration(30)) + assert.Nil(t, replay) + assert.Nil(t, walk) +} + +func TestRulesForShard_ClonesAreIndependentOfBase(t *testing.T) { + // Toggling active on a view clone must never propagate to the base + // action — that's the whole point of cloning per view. + rule := &s3lifecycle.Rule{ID: "iso", Status: s3lifecycle.StatusEnabled, ExpirationDays: 3} + snap := buildSnapshotForViews(t, "b1", rule) + key := s3lifecycle.ActionKey{ + Bucket: "b1", + RuleHash: s3lifecycle.RuleHash(rule), + ActionKind: s3lifecycle.ActionKindExpirationDays, + } + base := snap.Action(key) + require.NotNil(t, base) + baseActiveBefore := base.IsActive() + + replay, _ := snap.RulesForShard(0, s3lifecycle.DaysToDuration(30)) + require.NotNil(t, replay) + clone := replay.actions[key] + require.NotNil(t, clone) + assert.NotSame(t, base, clone, "clone must be a distinct *CompiledAction") + + // Force the clone inactive (write directly to its atomic so we exercise + // the active bit boundary, not just markActive's setter). + clone.engineState.Store(engineStateInactive) + assert.False(t, clone.IsActive()) + assert.Equal(t, baseActiveBefore, base.IsActive(), + "toggling the clone must not perturb the base's active bit") +} + +func TestRulesForShard_ZeroRetentionRoutesAllReplayKindsToWalk(t *testing.T) { + // retentionWindow == 0 (no proven retention) must route every replay- + // eligible action into walk — the partition is "ttl > 0 && ttl <= + // retentionWindow", and 0 fails the upper bound. This is the safe + // default before the meta-log horizon is known. + rule := &s3lifecycle.Rule{ + ID: "any", + Status: s3lifecycle.StatusEnabled, + ExpirationDays: 1, + } + snap := buildSnapshotForViews(t, "b1", rule) + replay, walk := snap.RulesForShard(0, 0) + assert.Nil(t, replay) + require.NotNil(t, walk) + assert.Len(t, walk.actions, 1) +} + +func TestRecoveryView_ActivatesEveryNonDisabledAction(t *testing.T) { + // Base snapshot has no PriorState → event-driven actions land inactive + // (pre-BootstrapComplete). Recovery view must clone them with active= + // true so the recovery walker emits matches for them. + rule := &s3lifecycle.Rule{ + ID: "pending", + Status: s3lifecycle.StatusEnabled, + ExpirationDays: 7, + } + snap := New().Compile( + []CompileInput{{Bucket: "b1", Rules: []*s3lifecycle.Rule{rule}}}, + CompileOptions{}, // no PriorStates → action lands inactive + ) + key := s3lifecycle.ActionKey{ + Bucket: "b1", + RuleHash: s3lifecycle.RuleHash(rule), + ActionKind: s3lifecycle.ActionKindExpirationDays, + } + require.False(t, snap.actions[key].IsActive(), "precondition: base action must be inactive") + + rv := RecoveryView(snap) + require.NotNil(t, rv) + clone := rv.actions[key] + require.NotNil(t, clone) + assert.True(t, clone.IsActive(), "recovery view must force-activate even pre-BootstrapComplete actions") + assert.Equal(t, ModeEventDriven, clone.Mode, "recovery preserves base Mode") +} + +func TestRecoveryView_PreservesBaseModeForScanAtDate(t *testing.T) { + // ScanAtDate actions are active at compile time (no BootstrapComplete + // requirement). Recovery view must keep their Mode as ModeScanAtDate so + // the walker's date evaluator runs as expected — a Mode rewrite here + // would re-route them through the event-driven gate by mistake. + when := time.Now().Add(48 * time.Hour) + rule := &s3lifecycle.Rule{ + ID: "d", + Status: s3lifecycle.StatusEnabled, + ExpirationDate: when, + } + snap := New().Compile( + []CompileInput{{Bucket: "b1", Rules: []*s3lifecycle.Rule{rule}}}, + CompileOptions{}, + ) + rv := RecoveryView(snap) + require.NotNil(t, rv) + require.Len(t, rv.actions, 1) + for _, a := range rv.actions { + assert.Equal(t, ModeScanAtDate, a.Mode) + assert.True(t, a.IsActive()) + } +} + +func TestRecoveryView_ExcludesDisabledRules(t *testing.T) { + rule := &s3lifecycle.Rule{ + ID: "off", + Status: s3lifecycle.StatusDisabled, + ExpirationDays: 7, + } + snap := buildSnapshotForViews(t, "b1", rule) + rv := RecoveryView(snap) + assert.Nil(t, rv, "disabled rules must not surface in recovery view") +} + +func TestRecoveryView_ClonesAreIndependentOfBase(t *testing.T) { + rule := &s3lifecycle.Rule{ID: "iso", Status: s3lifecycle.StatusEnabled, ExpirationDays: 3} + snap := buildSnapshotForViews(t, "b1", rule) + key := s3lifecycle.ActionKey{ + Bucket: "b1", + RuleHash: s3lifecycle.RuleHash(rule), + ActionKind: s3lifecycle.ActionKindExpirationDays, + } + base := snap.Action(key) + baseActiveBefore := base.IsActive() + + rv := RecoveryView(snap) + require.NotNil(t, rv) + clone := rv.actions[key] + require.NotNil(t, clone) + assert.NotSame(t, base, clone) + + clone.engineState.Store(engineStateInactive) + assert.Equal(t, baseActiveBefore, base.IsActive(), + "flipping the recovery clone must not perturb the base") +} + +func TestRecoveryView_NilSnapshot(t *testing.T) { + assert.Nil(t, RecoveryView(nil)) +}