feat(s3/lifecycle): wire walker into executeDailyReplay

Activates the recovery-branch walker. The handler composes the three
Phase 4b building blocks — FilerListFunc + WalkerDispatcher + WalkBuckets
— into a dailyrun.WalkerFunc and passes it via Config.Walker. The
bucket list is derived from the compiled inputs so it matches the
engine snapshot exactly.

Effect on master behavior: when a worker observes a RuleSetHash or
PromotedHash mismatch on its persisted cursor (rule content edited /
partition flip), runShard now walks the live filer tree under the
RecoveryView before rewinding the cursor. Already-due objects across
the rewritten rule set fire immediately instead of waiting on the
sliding meta-log replay.

Still scoped to replay-eligible action kinds because
checkSnapshotForUnsupported continues to reject walker-bound rules
(ExpirationDate / ExpiredDeleteMarker / NewerNoncurrent) and
scan_only-promoted rules at the top of Run. The follow-up commit
relaxes the gate once the steady-state walker over RulesForShard's
walk view is wired so those rules fire every day, not just on rule
edits.
This commit is contained in:
Chris Lu
2026-05-11 23:17:29 -07:00
parent e3c5f624b9
commit 20f2b71a4a

View File

@@ -319,10 +319,29 @@ func (h *Handler) executeDailyReplay(ctx context.Context, request *plugin_pb.Exe
limiter, limiterDesc := buildLimiterFromClusterContext(request.GetClusterContext())
// Reuse one LifecycleClient across the replay drain and the
// walker's per-entry dispatch.
client := lifecycleRPCAdapter{c: rpc}
// Bucket list for the walker — derived from inputs so it matches
// the snapshot the engine compiled.
buckets := make([]string, 0, len(inputs))
for _, in := range inputs {
if in.Bucket != "" {
buckets = append(buckets, in.Bucket)
}
}
walkerListFn := dailyrun.FilerListFunc(filerClient, bucketsPath)
walkerDispatch := &dailyrun.WalkerDispatcher{Client: client}
walker := dailyrun.WalkerFunc(func(walkCtx context.Context, view *engine.Snapshot, shardID int) error {
return dailyrun.WalkBuckets(walkCtx, view, shardID, buckets, walkerListFn, walkerDispatch)
})
_ = sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId, JobType: jobType,
State: plugin_pb.JobState_JOB_STATE_RUNNING, Stage: "starting",
Message: fmt.Sprintf("daily_replay shards=%d workers=%d runtime=%s rate=%s", len(shards), cfg.Workers, cfg.MaxRuntime, limiterDesc),
Message: fmt.Sprintf("daily_replay shards=%d workers=%d runtime=%s rate=%s buckets=%d walker=on",
len(shards), cfg.Workers, cfg.MaxRuntime, limiterDesc, len(buckets)),
})
runErr := dailyrun.Run(ctx, dailyrun.Config{
@@ -330,11 +349,12 @@ func (h *Handler) executeDailyReplay(ctx context.Context, request *plugin_pb.Exe
BucketsPath: bucketsPath,
Engine: eng,
FilerClient: filerClient,
Client: lifecycleRPCAdapter{c: rpc},
Client: client,
Persister: &dailyrun.FilerCursorPersister{Store: dispatcher.NewFilerStoreClient(filerClient)},
Lister: dispatcher.NewFilerSiblingLister(filerClient, bucketsPath),
Workers: cfg.Workers,
Limiter: limiter,
Walker: walker,
ClientName: "worker-s3-lifecycle-daily",
})
if dailyrun.IsUnsupportedRule(runErr) {