feat(s3/lifecycle/engine): daily-replay view surface (Phase 4 engine) (#9447)

* feat(s3/lifecycle/engine): daily-replay view surface (Phase 4 engine)

Adds the engine-side API the new daily-replay worker reaches for:
per-view snapshot construction (RulesForShard, RecoveryView), the two
cursor hashes that gate recovery (ReplayContentHash, PromotedHash),
and the cursor sliding-window helper (MaxEffectiveTTL). CurrentSnapshot
is a stub keyed on a package-level atomic that the worker startup wiring
populates.

Views return new *Snapshot instances holding cloned *CompiledAction
values so per-clone active/Mode never leak across partitions. Replay
clones force Mode=ModeEventDriven to rehabilitate any persistent
ModeScanOnly carried over from PriorState; walk and recovery clones
preserve Mode as-is. Disabled actions are excluded from all views.

No production caller is wired here — Phase 4's walker/dailyrun
integration is the follow-up. dailyrun's local helpers
(localReplayContentHash, localMaxEffectiveTTL) become one-line
redirects to these exports.

API surface:
- CurrentSnapshot() *Snapshot — stub until Phase 4 wiring.
- SetCurrentEngine(*Engine) — Phase 4 wiring entry point.
- Snapshot.RulesForShard(shardID, retentionWindow) (replay, walk *Snapshot)
- RecoveryView(s *Snapshot) *Snapshot — force-active over the full set.
- ReplayContentHash(s *Snapshot) [32]byte — partition-independent.
- PromotedHash(s *Snapshot, retentionWindow) [32]byte — partition-flip.
- MaxEffectiveTTL(s *Snapshot) time.Duration — over active replay only.

30 unit tests covering clone isolation, Mode rewrite, partition
membership including the multi-action-kind XML rule split,
RecoveryView activating pre-BootstrapComplete actions,
ReplayContentHash partition-independence, PromotedHash sensitivity to
promotion in either direction, MaxEffectiveTTL aggregation. Build +
race-tests green.

* refactor(s3/lifecycle/engine): consolidate hash helpers; clarify shardID semantics

Addresses PR #9447 review feedback. Three medium-priority items from
gemini, all code-quality refinements (no behavior change):

1. Duplicated sort comparator between ReplayContentHash and
   PromotedHash. Extract sortHashItems shared helper so the two
   hashes use the same ordering by construction — if one drifted, the
   cursor could see a spurious "rule changed" on a no-op snapshot
   rebuild.

2. Duplicated writeField/writeInt closures. Extract hashWriter struct
   holding the sha256 running hash + lenbuf, with method helpers.
   Same allocation profile (one Hash, one tiny stack buffer per
   helper); just deduplicates ~20 lines.

3. shardID parameter on RulesForShard is unused. Per the design's
   open question, every shard sees every rule today (shard filter
   runs at the entry-iteration site, not view construction). Keep
   the parameter for API stability — removing it now would force
   a breaking change when bucket-shard ownership lands — and update
   the doc comment to explain why it's reserved.

go build ./... clean; engine test suite green.
This commit is contained in:
Chris Lu
2026-05-11 18:07:54 -07:00
committed by GitHub
parent 122ca7c020
commit 3f4cb6d2fb
4 changed files with 1037 additions and 0 deletions

View File

@@ -0,0 +1,205 @@
package engine
import (
"bytes"
"crypto/sha256"
"encoding/binary"
"hash"
"sort"
"time"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle"
)
// hashItem pairs a ruleset member with its parent action so the
// sort+hash helpers below can be type-agnostic. Internal type — every
// hash callsite collects items into a slice, then hands it off.
type hashItem struct {
key s3lifecycle.ActionKey
action *CompiledAction
}
// sortHashItems orders items by (RuleHash, ActionKind, Bucket). The
// composition matches the ActionKey identity model: RuleHash is the
// primary content-derived identifier, ActionKind disambiguates siblings
// of one rule, Bucket scopes by bucket so the same XML in two buckets
// hashes distinctly. Shared between ReplayContentHash and PromotedHash
// so the two helpers agree on the on-wire ordering — if one drifted,
// the cursor could see "rule changed" on a no-op snapshot rebuild.
func sortHashItems(items []hashItem) {
sort.Slice(items, func(i, j int) bool {
if c := bytes.Compare(items[i].key.RuleHash[:], items[j].key.RuleHash[:]); c != 0 {
return c < 0
}
if items[i].key.ActionKind != items[j].key.ActionKind {
return items[i].key.ActionKind < items[j].key.ActionKind
}
return items[i].key.Bucket < items[j].key.Bucket
})
}
// hashWriter is the small varint-tagged writer the hash helpers use.
// Each field gets a one-byte tag so a future schema change (new field)
// can extend the on-wire format without colliding with existing values.
type hashWriter struct {
h hash.Hash
lenbuf [binary.MaxVarintLen64]byte
}
func newHashWriter() *hashWriter {
return &hashWriter{h: sha256.New()}
}
// writeField writes a length-prefixed byte field under tag.
func (w *hashWriter) writeField(tag byte, b []byte) {
_, _ = w.h.Write([]byte{tag})
n := binary.PutUvarint(w.lenbuf[:], uint64(len(b)))
_, _ = w.h.Write(w.lenbuf[:n])
_, _ = w.h.Write(b)
}
// writeInt writes a varint-encoded signed integer under tag.
func (w *hashWriter) writeInt(tag byte, v int64) {
_, _ = w.h.Write([]byte{tag})
n := binary.PutVarint(w.lenbuf[:], v)
_, _ = w.h.Write(w.lenbuf[:n])
}
func (w *hashWriter) sum() [32]byte {
var out [32]byte
copy(out[:], w.h.Sum(nil))
return out
}
// ReplayContentHash hashes the content (action kind, predicate, TTL value)
// of every replay-eligible compiled action in the base snapshot, returning
// the empty hash when no replay-eligible action exists. The hash is:
// - Partition-independent. A retention-driven scan_only promotion does
// NOT change this hash; only the dispatch path changes, not the rule
// content. (PromotedHash exists to catch partition flips separately.)
// - Stable across snapshot reorderings. Actions are pre-sorted by
// RuleHash + ActionKind + Bucket so two snapshots with the same rules
// compiled in any order hash identically.
// - Disabled-rule-aware. ModeDisabled actions are excluded so disabling
// a rule changes the hash (it changed the rule set the worker is
// scanning under).
//
// Used as cursor.RuleSetHash. A mismatch between persisted and current
// triggers the recovery branch on next daily_run.
func ReplayContentHash(s *Snapshot) [32]byte {
var empty [32]byte
if s == nil {
return empty
}
var items []hashItem
for k, a := range s.actions {
if a == nil || a.Mode == ModeDisabled {
continue
}
if !isReplayKind(k.ActionKind) {
continue
}
items = append(items, hashItem{key: k, action: a})
}
if len(items) == 0 {
return empty
}
sortHashItems(items)
w := newHashWriter()
for _, it := range items {
w.writeField(0x01, []byte(it.key.Bucket))
w.writeField(0x02, it.key.RuleHash[:])
w.writeInt(0x03, int64(it.key.ActionKind))
// RuleHash already covers the predicate (Prefix + FilterTags + size
// filters) and per-kind TTLs, so we don't need to re-canonicalise
// the *Rule. But we also include the action's effective TTL
// directly so that an "effective TTL of 0" (a malformed rule where
// the kind doesn't match the populated field) is distinguishable
// from a valid one.
w.writeInt(0x04, int64(effectiveTTL(it.action)))
}
return w.sum()
}
// PromotedHash hashes the set of replay-eligible actions that *would* land
// in walk (rather than replay) for the given retentionWindow, due to TTL >
// retentionWindow. Empty hash when no rules are promoted. Takes the SAME
// retentionWindow value as RulesForShard so the two helpers cannot disagree
// about partition membership.
//
// Detects partition flips in either direction:
// - replay → walk (retention dropped): rule appears in this hash but
// didn't before.
// - walk → replay (retention recovered): rule used to appear here but no
// longer does.
//
// In both cases the persisted hash differs from the freshly computed one,
// firing the recovery branch.
//
// A mismatch with the persisted PromotedHash triggers recovery even when
// rule content is unchanged.
func PromotedHash(s *Snapshot, retentionWindow time.Duration) [32]byte {
var empty [32]byte
if s == nil {
return empty
}
var items []hashItem
for k, a := range s.actions {
if a == nil || a.Mode == ModeDisabled {
continue
}
if !isReplayKind(k.ActionKind) {
continue
}
ttl := effectiveTTL(a)
// Mirror RulesForShard's partition predicate exactly: a replay
// kind lands in walk when ttl is 0 (malformed) or ttl >
// retentionWindow. PromotedHash hashes that walk-bound subset.
if ttl > 0 && ttl <= retentionWindow {
continue
}
items = append(items, hashItem{key: k, action: a})
}
if len(items) == 0 {
return empty
}
sortHashItems(items)
w := newHashWriter()
for _, it := range items {
w.writeField(0x01, []byte(it.key.Bucket))
w.writeField(0x02, it.key.RuleHash[:])
w.writeInt(0x03, int64(it.key.ActionKind))
}
return w.sum()
}
// MaxEffectiveTTL returns the maximum effective TTL across the *active*
// replay-eligible actions in s. Returns 0 for a nil snapshot or one with no
// active replay actions; the caller is expected to be in the empty-replay
// branch already (per the design's sentinel-cursor logic).
//
// "Effective TTL" mirrors the partition predicate in views.go: derived from
// the rule field that matches the action kind. Walker-only action kinds
// (ExpirationDate / ExpiredDeleteMarker / NewerNoncurrent) contribute
// nothing — they're either not replay-eligible or, in a walk view, not
// active in the replay sense.
func MaxEffectiveTTL(s *Snapshot) time.Duration {
if s == nil {
return 0
}
var max time.Duration
for k, a := range s.actions {
if a == nil || !a.IsActive() {
continue
}
if !isReplayKind(k.ActionKind) {
continue
}
if ttl := effectiveTTL(a); ttl > max {
max = ttl
}
}
return max
}

View File

@@ -0,0 +1,273 @@
package engine
import (
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestReplayContentHash_EmptyWhenNoReplayActions(t *testing.T) {
var empty [32]byte
// Nil snapshot.
assert.Equal(t, empty, ReplayContentHash(nil))
// Walker-only rule: ExpirationDate compiles to a single ScanAtDate
// action which is excluded from ReplayContentHash.
when := time.Now().Add(24 * time.Hour)
rule := &s3lifecycle.Rule{
ID: "d",
Status: s3lifecycle.StatusEnabled,
ExpirationDate: when,
}
snap := buildSnapshotForViews(t, "b1", rule)
assert.Equal(t, empty, ReplayContentHash(snap))
}
func TestReplayContentHash_PartitionIndependent(t *testing.T) {
// Critical invariant: a retention shift that promotes a rule from
// replay to walk (or back) MUST NOT change ReplayContentHash. The hash
// is over the rule content as it appears in the base snapshot, not
// over the partition it ends up in.
rule := &s3lifecycle.Rule{
ID: "shift",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 30,
}
snap := buildSnapshotForViews(t, "b1", rule)
hashWithLargeRetention := ReplayContentHash(snap)
// Whether retention is 1d (promotes to walk) or 365d (stays in
// replay), the base snapshot's rule content didn't move — only its
// partition did. ReplayContentHash hashes the base, so this is
// trivially constant. We re-call here to pin the contract.
_, walkSmall := snap.RulesForShard(0, s3lifecycle.DaysToDuration(1))
require.NotNil(t, walkSmall, "precondition: 1d retention promotes 30d rule to walk")
replayLarge, _ := snap.RulesForShard(0, s3lifecycle.DaysToDuration(365))
require.NotNil(t, replayLarge, "precondition: 365d retention keeps 30d rule in replay")
hashAgain := ReplayContentHash(snap)
assert.Equal(t, hashWithLargeRetention, hashAgain,
"ReplayContentHash must be partition-independent")
}
func TestReplayContentHash_ChangesOnRuleContentEdit(t *testing.T) {
rule := &s3lifecycle.Rule{
ID: "r",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 30,
}
snap1 := buildSnapshotForViews(t, "b1", rule)
h1 := ReplayContentHash(snap1)
// Edit the TTL — RuleHash changes, so the action's key changes, and
// the content hash must move.
rule2 := &s3lifecycle.Rule{
ID: "r",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 60,
}
snap2 := buildSnapshotForViews(t, "b1", rule2)
h2 := ReplayContentHash(snap2)
assert.NotEqual(t, h1, h2, "TTL edit must change ReplayContentHash")
}
func TestReplayContentHash_StableAcrossRuleReorder(t *testing.T) {
r1 := &s3lifecycle.Rule{ID: "a", Status: s3lifecycle.StatusEnabled, ExpirationDays: 7, Prefix: "a/"}
r2 := &s3lifecycle.Rule{ID: "b", Status: s3lifecycle.StatusEnabled, ExpirationDays: 14, Prefix: "b/"}
snapAB := buildSnapshotForViews(t, "b1", r1, r2)
snapBA := buildSnapshotForViews(t, "b1", r2, r1)
assert.Equal(t, ReplayContentHash(snapAB), ReplayContentHash(snapBA),
"reordering rules at compile time must not change ReplayContentHash")
}
func TestReplayContentHash_ExcludesWalkerOnlyAndDisabled(t *testing.T) {
// Walker-only kinds and disabled rules don't affect ReplayContentHash;
// only replay-eligible action kinds count.
replayRule := &s3lifecycle.Rule{
ID: "rep",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 7,
}
snapOnlyReplay := buildSnapshotForViews(t, "b1", replayRule)
baseline := ReplayContentHash(snapOnlyReplay)
walkerRule := &s3lifecycle.Rule{
ID: "walker",
Status: s3lifecycle.StatusEnabled,
ExpiredObjectDeleteMarker: true,
}
disabledRule := &s3lifecycle.Rule{
ID: "off",
Status: s3lifecycle.StatusDisabled,
ExpirationDays: 90,
}
snapMixed := buildSnapshotForViews(t, "b1", replayRule, walkerRule, disabledRule)
mixed := ReplayContentHash(snapMixed)
assert.Equal(t, baseline, mixed,
"adding walker-only and disabled rules must not change ReplayContentHash")
}
func TestPromotedHash_EmptyWhenNoneAreInWalk(t *testing.T) {
var empty [32]byte
assert.Equal(t, empty, PromotedHash(nil, 0))
rule := &s3lifecycle.Rule{
ID: "fits",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 7,
}
snap := buildSnapshotForViews(t, "b1", rule)
// Large retention → rule stays in replay → no promotion.
assert.Equal(t, empty, PromotedHash(snap, s3lifecycle.DaysToDuration(365)))
}
func TestPromotedHash_ChangesOnReplayToWalkPromotion(t *testing.T) {
// Retention drops below the rule's TTL → rule promotes from replay to
// walk → PromotedHash changes from empty to non-empty.
rule := &s3lifecycle.Rule{
ID: "long",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 30,
}
snap := buildSnapshotForViews(t, "b1", rule)
before := PromotedHash(snap, s3lifecycle.DaysToDuration(365)) // replay
after := PromotedHash(snap, s3lifecycle.DaysToDuration(7)) // promoted to walk
var empty [32]byte
assert.Equal(t, empty, before)
assert.NotEqual(t, empty, after)
assert.NotEqual(t, before, after, "promotion must change PromotedHash")
}
func TestPromotedHash_ChangesOnWalkToReplayDemotion(t *testing.T) {
// Retention recovers from < TTL to >= TTL → rule demotes from walk to
// replay → PromotedHash changes from non-empty to empty.
rule := &s3lifecycle.Rule{
ID: "demo",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 30,
}
snap := buildSnapshotForViews(t, "b1", rule)
before := PromotedHash(snap, s3lifecycle.DaysToDuration(7)) // walk
after := PromotedHash(snap, s3lifecycle.DaysToDuration(365)) // demoted back to replay
var empty [32]byte
assert.NotEqual(t, empty, before)
assert.Equal(t, empty, after)
assert.NotEqual(t, before, after, "demotion must change PromotedHash")
}
func TestPromotedHash_StableWhenContentUnchangedAndPartitionStays(t *testing.T) {
// Same snapshot + same retentionWindow → same PromotedHash.
rule := &s3lifecycle.Rule{
ID: "stable",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 30,
}
snap := buildSnapshotForViews(t, "b1", rule)
rw := s3lifecycle.DaysToDuration(7) // promoted
assert.Equal(t, PromotedHash(snap, rw), PromotedHash(snap, rw))
}
func TestPromotedHash_MatchesRulesForShardWalkMembership(t *testing.T) {
// PromotedHash must agree with RulesForShard about which replay-
// eligible actions are in walk for the same retentionWindow.
rules := []*s3lifecycle.Rule{
{ID: "short", Status: s3lifecycle.StatusEnabled, ExpirationDays: 1},
{ID: "long", Status: s3lifecycle.StatusEnabled, ExpirationDays: 90},
{ID: "noncurrent", Status: s3lifecycle.StatusEnabled, NoncurrentVersionExpirationDays: 200},
}
snap := buildSnapshotForViews(t, "b1", rules...)
rw := s3lifecycle.DaysToDuration(30)
replay, walk := snap.RulesForShard(0, rw)
require.NotNil(t, replay)
require.NotNil(t, walk)
// Expected walk members (from replay-eligible kinds only): long, noncurrent.
wantPromoted := map[s3lifecycle.ActionKey]struct{}{}
for k := range walk.actions {
if isReplayKind(k.ActionKind) {
wantPromoted[k] = struct{}{}
}
}
assert.Len(t, wantPromoted, 2, "two of three replay-eligible rules should promote")
hash := PromotedHash(snap, rw)
var empty [32]byte
assert.NotEqual(t, empty, hash)
}
func TestMaxEffectiveTTL_NilAndEmpty(t *testing.T) {
assert.Equal(t, time.Duration(0), MaxEffectiveTTL(nil))
// Snapshot with only walker-only kinds → no active replay action →
// MaxEffectiveTTL returns 0.
rule := &s3lifecycle.Rule{
ID: "walk",
Status: s3lifecycle.StatusEnabled,
ExpiredObjectDeleteMarker: true,
}
snap := buildSnapshotForViews(t, "b1", rule)
assert.Equal(t, time.Duration(0), MaxEffectiveTTL(snap))
}
func TestMaxEffectiveTTL_ReturnsMaxAcrossActiveReplay(t *testing.T) {
// Three replay-eligible actions with mixed TTLs; MaxEffectiveTTL picks
// the largest.
rule := &s3lifecycle.Rule{
ID: "mix",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 10,
NoncurrentVersionExpirationDays: 60,
AbortMPUDaysAfterInitiation: 3,
}
snap := buildSnapshotForViews(t, "b1", rule)
got := MaxEffectiveTTL(snap)
assert.Equal(t, s3lifecycle.DaysToDuration(60), got)
}
func TestMaxEffectiveTTL_OperatesOnReplayView(t *testing.T) {
// The caller passes the *replay* snapshot here in production. Verify
// that on a replay view the answer is the max over the view's
// (active) replay actions and excludes anything that was routed away
// to walk via scan_only promotion.
rule := &s3lifecycle.Rule{
ID: "mix",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 10,
NoncurrentVersionExpirationDays: 200, // would promote at small retention
}
snap := buildSnapshotForViews(t, "b1", rule)
// Retention 30d: ExpirationDays(10) stays in replay; NoncurrentDays(200)
// promotes to walk. The replay view's MaxEffectiveTTL is therefore 10d.
replay, _ := snap.RulesForShard(0, s3lifecycle.DaysToDuration(30))
require.NotNil(t, replay)
assert.Equal(t, s3lifecycle.DaysToDuration(10), MaxEffectiveTTL(replay))
}
func TestMaxEffectiveTTL_IgnoresInactiveActions(t *testing.T) {
// Pre-BootstrapComplete event-driven action is in the snapshot but
// inactive. MaxEffectiveTTL only counts active actions.
rule := &s3lifecycle.Rule{
ID: "pending",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 30,
}
snap := New().Compile(
[]CompileInput{{Bucket: "b1", Rules: []*s3lifecycle.Rule{rule}}},
CompileOptions{}, // no PriorState → inactive
)
for _, a := range snap.actions {
require.False(t, a.IsActive(), "precondition")
}
assert.Equal(t, time.Duration(0), MaxEffectiveTTL(snap),
"inactive replay actions must not contribute to MaxEffectiveTTL")
}

View File

@@ -0,0 +1,234 @@
package engine
import (
"bytes"
"sort"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle"
)
// currentEngine is the package-level pointer the wiring will eventually
// populate so daily_run can fetch the live snapshot through CurrentSnapshot.
// Today the worker/scheduler owns its own *Engine and passes the snapshot
// explicitly to its consumers; nothing calls SetCurrentEngine yet. Phase 4's
// daily-replay caller will register the worker's engine here on startup so
// the package-level helper resolves without threading the *Engine through
// every layer.
//
// TODO(phase4-wiring): hook scheduler/worker startup to call SetCurrentEngine.
var currentEngine atomic.Pointer[Engine]
// SetCurrentEngine installs e as the package-level engine returned by
// CurrentSnapshot. Intended for the worker bootstrap path; tests may also
// call it with a locally-constructed engine. Passing nil clears the pointer.
func SetCurrentEngine(e *Engine) { currentEngine.Store(e) }
// CurrentSnapshot returns the latest *Snapshot from the package-level engine,
// or nil if no engine has been registered yet. The caller is responsible for
// nil-checking the return — a missing engine is treated as "no rules" so the
// daily-run loop short-circuits cleanly instead of panicking during early
// startup.
//
// TODO(phase4-wiring): SetCurrentEngine must be invoked at worker startup
// for production callers; until then this returns nil.
func CurrentSnapshot() *Snapshot {
e := currentEngine.Load()
if e == nil {
return nil
}
return e.Snapshot()
}
// cloneAction makes an independent *CompiledAction for a per-view snapshot.
// The clone shares Rule (immutable) and Key (value) with the base, but its
// engineState (the active bit) is its own atomic — so flipping active on the
// clone never reaches the base or any sibling view. Mode is set explicitly by
// the caller to encode the per-view dispatch contract (replay forces
// ModeEventDriven so router.Route's gate passes; walk and recovery preserve
// the base Mode so the walker's "any non-Disabled" gate works as-is).
func cloneAction(src *CompiledAction, mode RuleMode, active bool) *CompiledAction {
if src == nil {
return nil
}
dst := &CompiledAction{
Rule: src.Rule,
Bucket: src.Bucket,
Key: src.Key,
Delay: src.Delay,
PredicateSensitive: src.PredicateSensitive,
Mode: mode,
}
if active {
dst.markActive()
}
return dst
}
// newView constructs a *Snapshot that holds the supplied per-view action
// clones but otherwise shares the base's immutable rule / index data by
// pointer (see DESIGN.md "router.Route integration" — only `active` and
// `Mode` differ per view; everything else is share-by-pointer).
//
// Returns nil if actions is empty: callers consume nil as "this partition
// has no actions, skip the corresponding dispatch path."
func newView(base *Snapshot, actions map[s3lifecycle.ActionKey]*CompiledAction) *Snapshot {
if len(actions) == 0 {
return nil
}
// allActionsSorted mirrors the base order: same comparator (Bucket,
// RuleHash, ActionKind) so downstream iteration order is stable per view.
sorted := make([]*CompiledAction, 0, len(actions))
for _, a := range actions {
sorted = append(sorted, a)
}
sort.Slice(sorted, func(i, j int) bool {
a, b := sorted[i], sorted[j]
if a.Bucket != b.Bucket {
return a.Bucket < b.Bucket
}
if c := bytes.Compare(a.Key.RuleHash[:], b.Key.RuleHash[:]); c != 0 {
return c < 0
}
return a.Key.ActionKind < b.Key.ActionKind
})
return &Snapshot{
id: base.id,
buckets: base.buckets,
actions: actions,
allActionsSorted: sorted,
originalDelayGroups: base.originalDelayGroups,
predicateActions: base.predicateActions,
dateActions: base.dateActions,
}
}
// isReplayKind reports whether ActionKind is a candidate for the replay
// partition: its trigger derives from a single event's TsNs (or a stamped
// noncurrent_since), so the meta-log scan can drive dispatch directly.
// ExpirationDate / ExpiredDeleteMarker / NewerNoncurrent are walker-only
// because their due time depends on current bucket state, not event age.
func isReplayKind(k s3lifecycle.ActionKind) bool {
switch k {
case s3lifecycle.ActionKindExpirationDays,
s3lifecycle.ActionKindNoncurrentDays,
s3lifecycle.ActionKindAbortMPU:
return true
}
return false
}
// effectiveTTL is the duration used both for partition membership
// (TTL ≤ retentionWindow → replay) and MaxEffectiveTTL math (driving the
// cursor's sliding scan window). Walker-only kinds return 0 so they never
// contribute to MaxEffectiveTTL and never satisfy the partition gate when
// retentionWindow is finite.
func effectiveTTL(a *CompiledAction) time.Duration {
if a == nil || a.Rule == nil {
return 0
}
switch a.Key.ActionKind {
case s3lifecycle.ActionKindExpirationDays:
if a.Rule.ExpirationDays > 0 {
return s3lifecycle.DaysToDuration(a.Rule.ExpirationDays)
}
case s3lifecycle.ActionKindNoncurrentDays:
if a.Rule.NoncurrentVersionExpirationDays > 0 {
return s3lifecycle.DaysToDuration(a.Rule.NoncurrentVersionExpirationDays)
}
case s3lifecycle.ActionKindAbortMPU:
if a.Rule.AbortMPUDaysAfterInitiation > 0 {
return s3lifecycle.DaysToDuration(a.Rule.AbortMPUDaysAfterInitiation)
}
}
return 0
}
// RulesForShard partitions the base snapshot's compiled actions into a
// replay view and a walk view.
//
// shardID is **reserved for forward-compatibility** and not consumed by
// this implementation. Every shard sees every rule today because the
// shard filter runs at the entry-iteration site (meta-log subscription
// per shard, walker entry-loop ShardID check) rather than at view
// construction. Keeping the parameter in the signature preserves the
// API surface for a future move to per-shard rule sets (e.g. when
// bucket-shard ownership is added) — removing it now would be a
// breaking change at that future point. See DESIGN.md "open questions
// → per-shard rule snapshots vs. global."
//
// retentionWindow is the only window input the engine sees; the
// daily_run caller computes it as `now - earliest_available` and
// passes it in so the partition is stable across one daily_run
// invocation.
//
// Membership:
// - replay: clones of ExpirationDays / NoncurrentDays / AbortMPU actions
// whose TTL ≤ retentionWindow. active=true, Mode forced to
// ModeEventDriven (rehabilitating any prior ModeScanOnly lock-in so
// router.Route's gate accepts the clone).
// - walk: clones of ExpirationDate / ExpiredDeleteMarker / NewerNoncurrent
// actions, plus any replay-eligible action whose TTL exceeds
// retentionWindow (the "scan_only promotion" path). active=true, Mode
// preserved from the base — the walker only rejects ModeDisabled.
//
// Either return value may be nil when its partition has no actions.
// Disabled actions (Mode == ModeDisabled) are excluded from both views.
func (s *Snapshot) RulesForShard(shardID int, retentionWindow time.Duration) (replay, walk *Snapshot) {
_ = shardID
if s == nil {
return nil, nil
}
replayActions := map[s3lifecycle.ActionKey]*CompiledAction{}
walkActions := map[s3lifecycle.ActionKey]*CompiledAction{}
for key, a := range s.actions {
if a == nil || a.Mode == ModeDisabled {
continue
}
if isReplayKind(key.ActionKind) {
ttl := effectiveTTL(a)
// retentionWindow == 0 means "no retention info supplied" — the
// caller should treat the partition as walk-only to stay safe,
// since promoting every rule into replay without retention
// proof would defeat the scan_only protection. ttl == 0 is a
// malformed rule (kind says replay but no TTL): also routes to
// walk so the walker can decide.
if ttl > 0 && ttl <= retentionWindow {
replayActions[key] = cloneAction(a, ModeEventDriven, true)
} else {
walkActions[key] = cloneAction(a, a.Mode, true)
}
continue
}
// Walker-only kinds: preserve Mode (ModeScanAtDate for
// ExpirationDate, ModeEventDriven for the others) so the walker's
// per-rule evaluator runs as today.
walkActions[key] = cloneAction(a, a.Mode, true)
}
return newView(s, replayActions), newView(s, walkActions)
}
// RecoveryView returns a *Snapshot that contains a clone of every action in
// the base, with active=true regardless of compile-time IsActive/
// BootstrapComplete and Mode preserved as-is. The recovery walker uses this
// view to catch already-due objects across the full rule set on cold start,
// rule edit, retention loss, or partition flip. Disabled actions stay out —
// recovery shouldn't resurrect an explicitly disabled rule.
//
// Unlike RulesForShard, RecoveryView takes no retentionWindow: every
// non-disabled action is activated, partition-independent, because recovery
// must reach rules whose subjects sit outside the replay scan window.
func RecoveryView(s *Snapshot) *Snapshot {
if s == nil {
return nil
}
actions := map[s3lifecycle.ActionKey]*CompiledAction{}
for key, a := range s.actions {
if a == nil || a.Mode == ModeDisabled {
continue
}
actions[key] = cloneAction(a, a.Mode, true)
}
return newView(s, actions)
}

View File

@@ -0,0 +1,325 @@
package engine
import (
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// buildSnapshotForViews compiles a snapshot with a single bucket containing
// one rule per supplied factory, marking every action bootstrap_complete so
// the base actions are active. Tests can then call RulesForShard and assert
// the per-view clones come out independent.
func buildSnapshotForViews(t *testing.T, bucket string, rules ...*s3lifecycle.Rule) *Snapshot {
t.Helper()
prior := map[s3lifecycle.ActionKey]PriorState{}
for _, r := range rules {
rh := s3lifecycle.RuleHash(r)
for _, k := range s3lifecycle.RuleActionKinds(r) {
prior[s3lifecycle.ActionKey{Bucket: bucket, RuleHash: rh, ActionKind: k}] = PriorState{
BootstrapComplete: true,
}
}
}
return New().Compile(
[]CompileInput{{Bucket: bucket, Rules: rules}},
CompileOptions{PriorStates: prior},
)
}
func TestCurrentSnapshot_NilWhenNoEngineRegistered(t *testing.T) {
// Save and restore the package-level pointer so tests stay isolated.
prev := currentEngine.Load()
t.Cleanup(func() { currentEngine.Store(prev) })
SetCurrentEngine(nil)
assert.Nil(t, CurrentSnapshot())
}
func TestCurrentSnapshot_ReturnsRegisteredEngineSnapshot(t *testing.T) {
prev := currentEngine.Load()
t.Cleanup(func() { currentEngine.Store(prev) })
e := New()
rule := &s3lifecycle.Rule{ID: "r", Status: s3lifecycle.StatusEnabled, ExpirationDays: 7}
snap := e.Compile([]CompileInput{{Bucket: "b", Rules: []*s3lifecycle.Rule{rule}}}, CompileOptions{})
SetCurrentEngine(e)
assert.Same(t, snap, CurrentSnapshot())
}
func TestRulesForShard_NilSnapshot(t *testing.T) {
var s *Snapshot
replay, walk := s.RulesForShard(0, 30*24*time.Hour)
assert.Nil(t, replay)
assert.Nil(t, walk)
}
func TestRulesForShard_ReplayPartitionMembership(t *testing.T) {
// ExpirationDays / NoncurrentDays / AbortMPU below retentionWindow all
// land in replay; the replay map carries clones with active=true and
// Mode=ModeEventDriven.
rule := &s3lifecycle.Rule{
ID: "all-replay",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 3,
NoncurrentVersionExpirationDays: 5,
AbortMPUDaysAfterInitiation: 2,
}
snap := buildSnapshotForViews(t, "b1", rule)
replay, walk := snap.RulesForShard(0, s3lifecycle.DaysToDuration(30))
require.NotNil(t, replay, "all three replay-eligible kinds should land in replay")
assert.Nil(t, walk, "no walker kinds, no scan_only promotion, walk should be empty")
assert.Len(t, replay.actions, 3)
for k, a := range replay.actions {
assert.True(t, a.IsActive(), "replay clone must be active for kind %v", k.ActionKind)
assert.Equal(t, ModeEventDriven, a.Mode, "replay Mode must be ModeEventDriven for kind %v", k.ActionKind)
}
}
func TestRulesForShard_WalkOnlyKindsRouteToWalk(t *testing.T) {
// ExpirationDate, ExpiredObjectDeleteMarker, NewerNoncurrent are walker-
// only and must land in walk regardless of retentionWindow.
when := time.Now().Add(24 * time.Hour)
rule := &s3lifecycle.Rule{
ID: "walker-mix",
Status: s3lifecycle.StatusEnabled,
ExpirationDate: when,
ExpiredObjectDeleteMarker: true,
NewerNoncurrentVersions: 2,
}
snap := buildSnapshotForViews(t, "b1", rule)
replay, walk := snap.RulesForShard(0, s3lifecycle.DaysToDuration(30))
assert.Nil(t, replay, "walker-only rule has no replay partition")
require.NotNil(t, walk)
assert.Len(t, walk.actions, 3, "all three walker kinds should land in walk")
for _, a := range walk.actions {
assert.True(t, a.IsActive())
}
}
func TestRulesForShard_MultiActionXMLRuleSplits(t *testing.T) {
// A single XML rule that compiles to ExpirationDays + NewerNoncurrent
// must split: ExpirationDays → replay, NewerNoncurrent → walk.
rule := &s3lifecycle.Rule{
ID: "split",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 3,
NewerNoncurrentVersions: 5,
}
snap := buildSnapshotForViews(t, "b1", rule)
replay, walk := snap.RulesForShard(0, s3lifecycle.DaysToDuration(30))
require.NotNil(t, replay)
require.NotNil(t, walk)
assert.Len(t, replay.actions, 1)
assert.Len(t, walk.actions, 1)
for k := range replay.actions {
assert.Equal(t, s3lifecycle.ActionKindExpirationDays, k.ActionKind)
}
for k := range walk.actions {
assert.Equal(t, s3lifecycle.ActionKindNewerNoncurrent, k.ActionKind)
}
}
func TestRulesForShard_ScanOnlyPromotionRoutesToWalk(t *testing.T) {
// A replay-eligible kind whose TTL exceeds retentionWindow promotes to
// walk. The clone's active=true is set; Mode is preserved as-is from
// the base (the walker only rejects ModeDisabled).
rule := &s3lifecycle.Rule{
ID: "long",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 90,
AbortMPUDaysAfterInitiation: 1,
}
snap := buildSnapshotForViews(t, "b1", rule)
// Retention window of 30 days promotes ExpirationDays(90d) to walk;
// AbortMPU(1d) stays in replay.
replay, walk := snap.RulesForShard(0, s3lifecycle.DaysToDuration(30))
require.NotNil(t, replay)
require.NotNil(t, walk)
assert.Len(t, replay.actions, 1)
assert.Len(t, walk.actions, 1)
for k, a := range replay.actions {
assert.Equal(t, s3lifecycle.ActionKindAbortMPU, k.ActionKind)
assert.Equal(t, ModeEventDriven, a.Mode)
}
for k := range walk.actions {
assert.Equal(t, s3lifecycle.ActionKindExpirationDays, k.ActionKind)
}
}
func TestRulesForShard_ReplayRewritesScanOnlyMode(t *testing.T) {
// A base action carrying Mode=ModeScanOnly via PriorState must still
// land in replay as ModeEventDriven once retention rehabilitates its
// TTL. router.Route gates on ModeEventDriven, so this is the
// rehabilitation contract.
rule := &s3lifecycle.Rule{
ID: "rehab",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 5,
}
rh := s3lifecycle.RuleHash(rule)
key := s3lifecycle.ActionKey{Bucket: "b1", RuleHash: rh, ActionKind: s3lifecycle.ActionKindExpirationDays}
snap := New().Compile(
[]CompileInput{{Bucket: "b1", Rules: []*s3lifecycle.Rule{rule}}},
CompileOptions{
PriorStates: map[s3lifecycle.ActionKey]PriorState{
// Persistent ModeScanOnly carried over from a prior compile.
key: {BootstrapComplete: true, Mode: ModeScanOnly},
},
},
)
// Verify the base really did keep ModeScanOnly (Compile preserves it).
require.Equal(t, ModeScanOnly, snap.actions[key].Mode)
replay, _ := snap.RulesForShard(0, s3lifecycle.DaysToDuration(30))
require.NotNil(t, replay)
clone := replay.actions[key]
require.NotNil(t, clone)
assert.Equal(t, ModeEventDriven, clone.Mode, "replay must force ModeEventDriven on the clone")
assert.True(t, clone.IsActive())
}
func TestRulesForShard_DisabledRuleExcludedFromBothViews(t *testing.T) {
rule := &s3lifecycle.Rule{ID: "off", Status: s3lifecycle.StatusDisabled, ExpirationDays: 7}
snap := buildSnapshotForViews(t, "b1", rule)
replay, walk := snap.RulesForShard(0, s3lifecycle.DaysToDuration(30))
assert.Nil(t, replay)
assert.Nil(t, walk)
}
func TestRulesForShard_ClonesAreIndependentOfBase(t *testing.T) {
// Toggling active on a view clone must never propagate to the base
// action — that's the whole point of cloning per view.
rule := &s3lifecycle.Rule{ID: "iso", Status: s3lifecycle.StatusEnabled, ExpirationDays: 3}
snap := buildSnapshotForViews(t, "b1", rule)
key := s3lifecycle.ActionKey{
Bucket: "b1",
RuleHash: s3lifecycle.RuleHash(rule),
ActionKind: s3lifecycle.ActionKindExpirationDays,
}
base := snap.Action(key)
require.NotNil(t, base)
baseActiveBefore := base.IsActive()
replay, _ := snap.RulesForShard(0, s3lifecycle.DaysToDuration(30))
require.NotNil(t, replay)
clone := replay.actions[key]
require.NotNil(t, clone)
assert.NotSame(t, base, clone, "clone must be a distinct *CompiledAction")
// Force the clone inactive (write directly to its atomic so we exercise
// the active bit boundary, not just markActive's setter).
clone.engineState.Store(engineStateInactive)
assert.False(t, clone.IsActive())
assert.Equal(t, baseActiveBefore, base.IsActive(),
"toggling the clone must not perturb the base's active bit")
}
func TestRulesForShard_ZeroRetentionRoutesAllReplayKindsToWalk(t *testing.T) {
// retentionWindow == 0 (no proven retention) must route every replay-
// eligible action into walk — the partition is "ttl > 0 && ttl <=
// retentionWindow", and 0 fails the upper bound. This is the safe
// default before the meta-log horizon is known.
rule := &s3lifecycle.Rule{
ID: "any",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 1,
}
snap := buildSnapshotForViews(t, "b1", rule)
replay, walk := snap.RulesForShard(0, 0)
assert.Nil(t, replay)
require.NotNil(t, walk)
assert.Len(t, walk.actions, 1)
}
func TestRecoveryView_ActivatesEveryNonDisabledAction(t *testing.T) {
// Base snapshot has no PriorState → event-driven actions land inactive
// (pre-BootstrapComplete). Recovery view must clone them with active=
// true so the recovery walker emits matches for them.
rule := &s3lifecycle.Rule{
ID: "pending",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 7,
}
snap := New().Compile(
[]CompileInput{{Bucket: "b1", Rules: []*s3lifecycle.Rule{rule}}},
CompileOptions{}, // no PriorStates → action lands inactive
)
key := s3lifecycle.ActionKey{
Bucket: "b1",
RuleHash: s3lifecycle.RuleHash(rule),
ActionKind: s3lifecycle.ActionKindExpirationDays,
}
require.False(t, snap.actions[key].IsActive(), "precondition: base action must be inactive")
rv := RecoveryView(snap)
require.NotNil(t, rv)
clone := rv.actions[key]
require.NotNil(t, clone)
assert.True(t, clone.IsActive(), "recovery view must force-activate even pre-BootstrapComplete actions")
assert.Equal(t, ModeEventDriven, clone.Mode, "recovery preserves base Mode")
}
func TestRecoveryView_PreservesBaseModeForScanAtDate(t *testing.T) {
// ScanAtDate actions are active at compile time (no BootstrapComplete
// requirement). Recovery view must keep their Mode as ModeScanAtDate so
// the walker's date evaluator runs as expected — a Mode rewrite here
// would re-route them through the event-driven gate by mistake.
when := time.Now().Add(48 * time.Hour)
rule := &s3lifecycle.Rule{
ID: "d",
Status: s3lifecycle.StatusEnabled,
ExpirationDate: when,
}
snap := New().Compile(
[]CompileInput{{Bucket: "b1", Rules: []*s3lifecycle.Rule{rule}}},
CompileOptions{},
)
rv := RecoveryView(snap)
require.NotNil(t, rv)
require.Len(t, rv.actions, 1)
for _, a := range rv.actions {
assert.Equal(t, ModeScanAtDate, a.Mode)
assert.True(t, a.IsActive())
}
}
func TestRecoveryView_ExcludesDisabledRules(t *testing.T) {
rule := &s3lifecycle.Rule{
ID: "off",
Status: s3lifecycle.StatusDisabled,
ExpirationDays: 7,
}
snap := buildSnapshotForViews(t, "b1", rule)
rv := RecoveryView(snap)
assert.Nil(t, rv, "disabled rules must not surface in recovery view")
}
func TestRecoveryView_ClonesAreIndependentOfBase(t *testing.T) {
rule := &s3lifecycle.Rule{ID: "iso", Status: s3lifecycle.StatusEnabled, ExpirationDays: 3}
snap := buildSnapshotForViews(t, "b1", rule)
key := s3lifecycle.ActionKey{
Bucket: "b1",
RuleHash: s3lifecycle.RuleHash(rule),
ActionKind: s3lifecycle.ActionKindExpirationDays,
}
base := snap.Action(key)
baseActiveBefore := base.IsActive()
rv := RecoveryView(snap)
require.NotNil(t, rv)
clone := rv.actions[key]
require.NotNil(t, clone)
assert.NotSame(t, base, clone)
clone.engineState.Store(engineStateInactive)
assert.Equal(t, baseActiveBefore, base.IsActive(),
"flipping the recovery clone must not perturb the base")
}
func TestRecoveryView_NilSnapshot(t *testing.T) {
assert.Nil(t, RecoveryView(nil))
}