feat(s3/lifecycle): bucket-level bootstrap walker (#9350)

* feat(worker): add TaskTypeS3Lifecycle constant

Single job type for the lifecycle worker; the S3LifecycleParams.Subtype
field (READ / BOOTSTRAP / DRAIN) dispatches inside the handler. The
"s3_lifecycle" string is already wired to LaneLifecycle in
admin/plugin/scheduler_lane.go so adding the constant doesn't change
runtime behavior — it lets future commits reference the type name
without sprinkling string literals.

* feat(s3/lifecycle): bucket-level bootstrap walker

Iterates entries in a bucket, evaluates every active ActionKey in the
engine snapshot against each entry, and dispatches inline-delete for
currently-due actions. Date-kind actions and pending_bootstrap actions
are skipped — the former are handled by their own SCAN_AT_DATE
bootstrap, the latter aren't IsActive() yet.

Walker is callback-driven so callers supply the listing source
(real filer_pb.SeaweedList or test fake) and the dispatcher (real
LifecycleDelete client or test fake). This keeps the walker free of
filer_pb dependencies and makes the per-action evaluation flow
unit-testable in isolation.

Checkpoint state (LastScannedPath, Completed) is returned to the
caller, who is responsible for persisting it under
/etc/s3/lifecycle/<bucket>/_bootstrap. Walk() honours opts.Resume so
a kill-resumed task picks up where the previous walker stopped.

Tests cover: prefix-mismatched skip, not-yet-due skip (reader's job),
date-kind skip, pending_bootstrap skip, multi-action rule (one rule
with three actions dispatches three times — the regression that
per-action keying fixes), dispatch error halts at last-successful
checkpoint, Resume skips entries up to and including the resume
path.

* test(s3/lifecycle): walker test uses bucket-scoped ActionKey

Mechanical follow-up to the bucket-scoped ActionKey on
lifecycle-engine: the bootstrap walker tests construct ActionKeys to
seed PriorStates and need the Bucket field to match what
engine.Compile keys against.

* fix(s3/lifecycle): walker quick wins

Two minor cleanups noted on review:

- Drop the redundant Resume re-filter inside the Walk callback.
  ListFunc's contract already promises "skip entries with Path <=
  start"; trusting that contract avoids divergence if the filter
  logic ever changes on one side and not the other.

- Hoist the ObjectInfo allocation out of the per-action loop in
  walkEntry. Multi-action rules previously allocated one ObjectInfo
  per (entry, kind) pair; now it's one per entry, reused across all
  matching kinds.

* fix(s3/lifecycle): walker Entry.NoncurrentIndex tracks ObjectInfo's *int

ObjectInfo.NoncurrentIndex is now *int so unset is unambiguous;
mirror that on bootstrap.Entry so the per-entry construction stays
type-clean. Phase 5 (versioned-bucket walks) is the first caller
that will populate the field.

* refactor(s3/lifecycle): trim narration from bootstrap walker

Drop the inline step-by-step on Walk and the multi-paragraph package
preamble; the function names already say it. Keep one-liner WHYs at
the SCAN_AT_DATE skip and the once-per-entry ObjectInfo build.

* fix(s3/lifecycle): walker skips directories and ModeDisabled actions

Two safety findings from review:

1. SeaweedFS directory entries can appear in the listing alongside
   objects; without an IsDirectory check the walker would treat a dir
   like any other entry and could dispatch a delete against it. Add
   IsDirectory to bootstrap.Entry and short-circuit it before
   walkEntry.

2. ModeDisabled is set by the operator (e.g. shell pause) independent
   of the XML rule's Status field. EvaluateAction gates on Status and
   would still fire for an operator-disabled action whose XML status
   is "Enabled". Skip ModeDisabled explicitly in walkEntry alongside
   the existing SCAN_AT_DATE skip.

Two regression tests pin both cases.

* perf(s3/lifecycle): reuse ObjectInfo across walker entries

Walker allocated one ObjectInfo struct per entry. For buckets with
millions of objects that's measurable GC pressure. Hoist the
allocation out of the per-entry callback (one per Walk) and reuse
via field assignment in walkEntry.

EvaluateAction reads ObjectInfo synchronously and doesn't retain a
reference, so the reuse is safe — the next iteration's overwrite
can't corrupt an in-flight evaluation.

* refactor(s3/lifecycle): trim narration on walker

Drop the multi-line Entry / ObjectInfo-reuse / SCAN_AT_DATE+DISABLED
explanations. The walker's structure is small enough that the
condition itself reads as the documentation.
This commit is contained in:
Chris Lu
2026-05-07 15:04:51 -07:00
committed by GitHub
parent 5ab3860005
commit 4f79d8e358
3 changed files with 471 additions and 0 deletions

View File

@@ -0,0 +1,153 @@
// Package bootstrap is the bucket-level lifecycle walker. The walker
// iterates every entry under a bucket, evaluates every active ActionKey
// against it, and dispatches inline-delete RPCs for currently-due actions;
// not-yet-due entries are left for the meta-log reader to pick up later.
//
// Callback-driven so the listing source and the LifecycleDelete dispatcher
// can be supplied separately (real client or test fake).
package bootstrap
import (
"context"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle/engine"
)
// Entry is the routing-relevant slice of a filer entry. SuccessorModTime
// and NoncurrentIndex are populated only on versioned-bucket walks; the
// retention path bails out conservatively when they're zero / nil.
type Entry struct {
Path string
ModTime time.Time
Size int64
IsDirectory bool
IsLatest bool
IsDeleteMarker bool
IsMPUInit bool
NumVersions int
Tags map[string]string
SuccessorModTime time.Time
NoncurrentIndex *int
}
// ListFunc must skip entries with Path <= start so kill-resume picks up
// where the previous run stopped.
type ListFunc func(ctx context.Context, bucket, start string, cb func(*Entry) error) error
// Dispatcher executes one (action, entry) verdict. An error halts the walk;
// the caller decides whether to retry from the recorded last_scanned_path.
type Dispatcher interface {
Delete(ctx context.Context, action *engine.CompiledAction, entry *Entry) error
}
// Checkpoint is the resume state. Caller persists it under
// /etc/s3/lifecycle/<bucket>/_bootstrap.
type Checkpoint struct {
LastScannedPath string
Completed bool
}
type WalkOptions struct {
Resume string
Now time.Time
}
// Walk iterates entries via list, evaluates each active ActionKey via
// MatchPath + EvaluateAction, and calls Dispatcher.Delete for currently-due
// actions. SCAN_AT_DATE actions are skipped (their bootstrap is scheduled
// separately).
func Walk(ctx context.Context, snap *engine.Snapshot, bucket string, list ListFunc, dispatch Dispatcher, opts WalkOptions) (Checkpoint, error) {
now := opts.Now
if now.IsZero() {
now = time.Now().UTC()
}
cp := Checkpoint{LastScannedPath: opts.Resume}
// Reuse one ObjectInfo across the walk; EvaluateAction reads it
// synchronously without retaining.
var info s3lifecycle.ObjectInfo
err := list(ctx, bucket, opts.Resume, func(entry *Entry) error {
if entry == nil || entry.Path == "" {
return nil
}
// Lifecycle never applies to directory entries.
if entry.IsDirectory {
cp.LastScannedPath = entry.Path
return nil
}
if err := walkEntry(ctx, snap, bucket, entry, dispatch, now, &info); err != nil {
return err
}
cp.LastScannedPath = entry.Path
return nil
})
if err != nil {
return cp, err
}
cp.Completed = true
return cp, nil
}
func walkEntry(ctx context.Context, snap *engine.Snapshot, bucket string, entry *Entry, dispatch Dispatcher, now time.Time, info *s3lifecycle.ObjectInfo) error {
keys := snap.MatchPath(bucket, entry.Path, nil)
if len(keys) == 0 {
return nil
}
*info = s3lifecycle.ObjectInfo{
Key: entry.Path,
ModTime: entry.ModTime,
Size: entry.Size,
IsLatest: entry.IsLatest,
IsDeleteMarker: entry.IsDeleteMarker,
IsMPUInit: entry.IsMPUInit,
NumVersions: entry.NumVersions,
SuccessorModTime: entry.SuccessorModTime,
NoncurrentIndex: entry.NoncurrentIndex,
Tags: entry.Tags,
}
for _, key := range keys {
action := snap.Action(key)
if action == nil {
continue
}
// SCAN_AT_DATE runs its own date-triggered bootstrap. DISABLED can
// be flipped at runtime independent of XML Status, so skip it even
// though EvaluateAction would also reject.
if action.Mode == engine.ModeScanAtDate || action.Mode == engine.ModeDisabled {
continue
}
res := s3lifecycle.EvaluateAction(action.Rule, key.ActionKind, info, now)
if res.Action == s3lifecycle.ActionNone {
continue
}
if err := dispatch.Delete(ctx, action, entry); err != nil {
glog.Warningf("lifecycle bootstrap: dispatch %s/%s kind=%s: %v",
bucket, entry.Path, key.ActionKind, err)
return err
}
}
return nil
}
// EntryCallback wraps an in-memory slice as a ListFunc; useful for tests.
func EntryCallback(entries []*Entry) ListFunc {
return func(ctx context.Context, bucket, start string, cb func(*Entry) error) error {
for _, e := range entries {
if start != "" && e.Path <= start {
continue
}
if err := cb(e); err != nil {
return err
}
}
return nil
}
}
func HasPrefix(path, prefix string) bool { return strings.HasPrefix(path, prefix) }

View File

@@ -0,0 +1,312 @@
package bootstrap
import (
"context"
"errors"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle/engine"
)
// recorder captures dispatched (action, entry) pairs for assertion.
type recorder struct {
calls []dispatchCall
err error // when set, every Delete returns this error
}
type dispatchCall struct {
kind s3lifecycle.ActionKind
path string
}
func (r *recorder) Delete(ctx context.Context, action *engine.CompiledAction, entry *Entry) error {
if r.err != nil {
return r.err
}
r.calls = append(r.calls, dispatchCall{kind: action.Key.ActionKind, path: entry.Path})
return nil
}
func mustTime(t *testing.T, s string) time.Time {
t.Helper()
tm, err := time.Parse(time.RFC3339, s)
if err != nil {
t.Fatalf("parse %s: %v", s, err)
}
return tm
}
func compileEvDriven(t *testing.T, bucket string, rules ...*s3lifecycle.Rule) *engine.Snapshot {
t.Helper()
prior := map[s3lifecycle.ActionKey]engine.PriorState{}
for _, r := range rules {
rh := s3lifecycle.RuleHash(r)
for _, k := range s3lifecycle.RuleActionKinds(r) {
prior[s3lifecycle.ActionKey{Bucket: bucket, RuleHash: rh, ActionKind: k}] = engine.PriorState{BootstrapComplete: true}
}
}
e := engine.New()
return e.Compile([]engine.CompileInput{{Bucket: bucket, Rules: rules}}, engine.CompileOptions{PriorStates: prior})
}
func TestWalk_DispatchesDueActions(t *testing.T) {
rule := &s3lifecycle.Rule{
ID: "r",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 30,
Prefix: "logs/",
}
snap := compileEvDriven(t, "bk", rule)
mod := mustTime(t, "2024-01-01T00:00:00Z")
now := mod.AddDate(0, 0, 60) // past the 30d threshold
entries := []*Entry{
{Path: "data/x", IsLatest: true, ModTime: mod}, // wrong prefix
{Path: "logs/a", IsLatest: true, ModTime: mod}, // due
{Path: "logs/b", IsLatest: true, ModTime: now}, // not yet due (mod=now)
}
rec := &recorder{}
cp, err := Walk(context.Background(), snap, "bk", EntryCallback(entries), rec, WalkOptions{Now: now})
if err != nil {
t.Fatalf("Walk: %v", err)
}
if !cp.Completed {
t.Fatalf("walk should complete")
}
if cp.LastScannedPath != "logs/b" {
t.Fatalf("checkpoint last scanned want logs/b, got %q", cp.LastScannedPath)
}
if len(rec.calls) != 1 || rec.calls[0].path != "logs/a" {
t.Fatalf("dispatched calls want [logs/a], got %v", rec.calls)
}
}
func TestWalk_MultiActionRule_AllDueDispatched(t *testing.T) {
// One rule with three actions; all currently-due for the entry. The
// walker must dispatch one Delete per action — this is the
// regression that the per-action keying fixes.
rule := &s3lifecycle.Rule{
ID: "multi",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 30,
NoncurrentVersionExpirationDays: 7,
AbortMPUDaysAfterInitiation: 5,
}
snap := compileEvDriven(t, "bk", rule)
mod := mustTime(t, "2024-01-01T00:00:00Z")
now := mod.AddDate(0, 0, 100) // past every threshold
// Use a single entry that satisfies all three action shapes is
// unrealistic; in practice each shape is a different entry. Cover
// each shape independently.
entries := []*Entry{
// Current version under ExpirationDays.
{Path: "obj/a", IsLatest: true, ModTime: mod},
// Non-current version under NoncurrentDays.
{Path: "obj/a/.versions/v1", IsLatest: false, ModTime: mod, SuccessorModTime: mod},
// MPU init under AbortMPU.
{Path: ".uploads/u1/", IsMPUInit: true, ModTime: mod},
}
rec := &recorder{}
if _, err := Walk(context.Background(), snap, "bk", EntryCallback(entries), rec, WalkOptions{Now: now}); err != nil {
t.Fatalf("Walk: %v", err)
}
gotKinds := map[s3lifecycle.ActionKind]bool{}
for _, c := range rec.calls {
gotKinds[c.kind] = true
}
for _, want := range []s3lifecycle.ActionKind{
s3lifecycle.ActionKindExpirationDays,
s3lifecycle.ActionKindNoncurrentDays,
s3lifecycle.ActionKindAbortMPU,
} {
if !gotKinds[want] {
t.Fatalf("expected dispatch for %v, got %v", want, rec.calls)
}
}
}
func TestWalk_NotYetDueSkipped(t *testing.T) {
// The reader (Phase 3) is responsible for not-yet-due entries; the
// walker dispatches only currently-due ones, so the meta-log path
// stays the steady-state route.
rule := &s3lifecycle.Rule{
ID: "r",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 30,
}
snap := compileEvDriven(t, "bk", rule)
mod := mustTime(t, "2024-01-01T00:00:00Z")
now := mod.AddDate(0, 0, 10) // before the 30d threshold
rec := &recorder{}
if _, err := Walk(context.Background(), snap, "bk", EntryCallback([]*Entry{
{Path: "x/a", IsLatest: true, ModTime: mod},
}), rec, WalkOptions{Now: now}); err != nil {
t.Fatalf("Walk: %v", err)
}
if len(rec.calls) != 0 {
t.Fatalf("not-yet-due entry should not dispatch, got %v", rec.calls)
}
}
func TestWalk_DateActionsSkipped(t *testing.T) {
// Date kind is handled by its own SCAN_AT_DATE bootstrap, not by the
// regular bootstrap walker.
date := mustTime(t, "2025-06-15T00:00:00Z")
rule := &s3lifecycle.Rule{
ID: "d",
Status: s3lifecycle.StatusEnabled,
ExpirationDate: date,
}
snap := compileEvDriven(t, "bk", rule)
rec := &recorder{}
if _, err := Walk(context.Background(), snap, "bk", EntryCallback([]*Entry{
{Path: "x/a", IsLatest: true, ModTime: mustTime(t, "2024-01-01T00:00:00Z")},
}), rec, WalkOptions{Now: date.AddDate(0, 1, 0)}); err != nil {
t.Fatalf("Walk: %v", err)
}
if len(rec.calls) != 0 {
t.Fatalf("date kind should not dispatch from walker, got %v", rec.calls)
}
}
func TestWalk_DirectoryEntriesSkipped(t *testing.T) {
// SeaweedFS directory entries can co-exist in the listing; the walker
// must not dispatch deletes against them even when their path matches.
rule := &s3lifecycle.Rule{ID: "r", Status: s3lifecycle.StatusEnabled, ExpirationDays: 1}
snap := compileEvDriven(t, "bk", rule)
mod := mustTime(t, "2024-01-01T00:00:00Z")
now := mod.AddDate(0, 0, 10)
rec := &recorder{}
if _, err := Walk(context.Background(), snap, "bk", EntryCallback([]*Entry{
{Path: "x", IsDirectory: true, ModTime: mod}, // directory; must skip
{Path: "x/file", IsLatest: true, ModTime: mod},
}), rec, WalkOptions{Now: now}); err != nil {
t.Fatalf("Walk: %v", err)
}
if len(rec.calls) != 1 || rec.calls[0].path != "x/file" {
t.Fatalf("only the file should dispatch, got %v", rec.calls)
}
}
func TestWalk_DisabledModeSkipped(t *testing.T) {
// An operator-flipped ModeDisabled must short-circuit the walker even
// when the XML rule status is "Enabled" and EvaluateAction would
// otherwise fire.
rule := &s3lifecycle.Rule{ID: "r", Status: s3lifecycle.StatusEnabled, ExpirationDays: 1}
rh := s3lifecycle.RuleHash(rule)
prior := map[s3lifecycle.ActionKey]engine.PriorState{
{Bucket: "bk", RuleHash: rh, ActionKind: s3lifecycle.ActionKindExpirationDays}: {
BootstrapComplete: true, Mode: engine.ModeDisabled,
},
}
e := engine.New()
snap := e.Compile([]engine.CompileInput{{Bucket: "bk", Rules: []*s3lifecycle.Rule{rule}}}, engine.CompileOptions{PriorStates: prior})
mod := mustTime(t, "2024-01-01T00:00:00Z")
now := mod.AddDate(0, 0, 10)
rec := &recorder{}
if _, err := Walk(context.Background(), snap, "bk", EntryCallback([]*Entry{
{Path: "x/a", IsLatest: true, ModTime: mod},
}), rec, WalkOptions{Now: now}); err != nil {
t.Fatalf("Walk: %v", err)
}
if len(rec.calls) != 0 {
t.Fatalf("disabled action must not dispatch, got %v", rec.calls)
}
}
func TestWalk_PendingBootstrapNotDispatched(t *testing.T) {
// Without bootstrap_complete=true in PriorStates, the engine compiles
// the action as inactive. MatchPath filters on IsActive, so the
// walker won't dispatch.
rule := &s3lifecycle.Rule{
ID: "r",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 1,
}
e := engine.New()
snap := e.Compile([]engine.CompileInput{{Bucket: "bk", Rules: []*s3lifecycle.Rule{rule}}}, engine.CompileOptions{})
mod := mustTime(t, "2024-01-01T00:00:00Z")
now := mod.AddDate(0, 0, 10)
rec := &recorder{}
if _, err := Walk(context.Background(), snap, "bk", EntryCallback([]*Entry{
{Path: "x/a", IsLatest: true, ModTime: mod},
}), rec, WalkOptions{Now: now}); err != nil {
t.Fatalf("Walk: %v", err)
}
if len(rec.calls) != 0 {
t.Fatalf("inactive action should not dispatch, got %v", rec.calls)
}
}
func TestWalk_DispatchErrorHaltsAtCheckpoint(t *testing.T) {
rule := &s3lifecycle.Rule{
ID: "r",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 1,
}
snap := compileEvDriven(t, "bk", rule)
mod := mustTime(t, "2024-01-01T00:00:00Z")
now := mod.AddDate(0, 0, 10)
entries := []*Entry{
{Path: "a", IsLatest: true, ModTime: mod},
{Path: "b", IsLatest: true, ModTime: mod},
{Path: "c", IsLatest: true, ModTime: mod},
}
wantErr := errors.New("dispatch boom")
rec := &recorder{err: wantErr}
cp, err := Walk(context.Background(), snap, "bk", EntryCallback(entries), rec, WalkOptions{Now: now})
if !errors.Is(err, wantErr) {
t.Fatalf("want dispatch error, got %v", err)
}
if cp.Completed {
t.Fatalf("walk should not be Completed on dispatch failure")
}
// Walker stops on first failure; checkpoint stays at whatever was
// recorded BEFORE the failed entry. Path "a" is the failing entry,
// so LastScannedPath stays at the resume point (empty here).
if cp.LastScannedPath != "" {
t.Fatalf("checkpoint should not advance past failing entry, got %q", cp.LastScannedPath)
}
}
func TestWalk_ResumeFromCheckpoint(t *testing.T) {
rule := &s3lifecycle.Rule{
ID: "r",
Status: s3lifecycle.StatusEnabled,
ExpirationDays: 1,
}
snap := compileEvDriven(t, "bk", rule)
mod := mustTime(t, "2024-01-01T00:00:00Z")
now := mod.AddDate(0, 0, 10)
entries := []*Entry{
{Path: "a", IsLatest: true, ModTime: mod},
{Path: "b", IsLatest: true, ModTime: mod},
{Path: "c", IsLatest: true, ModTime: mod},
}
rec := &recorder{}
cp, err := Walk(context.Background(), snap, "bk", EntryCallback(entries), rec, WalkOptions{Now: now, Resume: "b"})
if err != nil {
t.Fatalf("Walk: %v", err)
}
if !cp.Completed {
t.Fatalf("walk should complete")
}
// Only "c" is processed (entries with Path <= "b" are skipped).
if len(rec.calls) != 1 || rec.calls[0].path != "c" {
t.Fatalf("Resume should only process c, got %v", rec.calls)
}
if cp.LastScannedPath != "c" {
t.Fatalf("checkpoint want c, got %q", cp.LastScannedPath)
}
}

View File

@@ -16,6 +16,12 @@ const (
TaskTypeBalance TaskType = "balance"
TaskTypeReplication TaskType = "replication"
TaskTypeECBalance TaskType = "ec_balance"
// TaskTypeS3Lifecycle covers all three lifecycle subtypes (READ, BOOTSTRAP,
// DRAIN) — one job type, dispatched by S3LifecycleParams.Subtype inside
// the handler. The lane mapping in admin/plugin/scheduler_lane.go binds
// "s3_lifecycle" -> LaneLifecycle so lifecycle work runs in its own
// scheduling lane and never blocks the default-lane admin lock.
TaskTypeS3Lifecycle TaskType = "s3_lifecycle"
)
// TaskStatus represents the status of a maintenance task