From 20f2b71a4a4e74699fbd307d71f60bd6fed7d071 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 11 May 2026 23:17:29 -0700 Subject: [PATCH] feat(s3/lifecycle): wire walker into executeDailyReplay MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Activates the recovery-branch walker. The handler composes the three Phase 4b building blocks — FilerListFunc + WalkerDispatcher + WalkBuckets — into a dailyrun.WalkerFunc and passes it via Config.Walker. The bucket list is derived from the compiled inputs so it matches the engine snapshot exactly. Effect on master behavior: when a worker observes a RuleSetHash or PromotedHash mismatch on its persisted cursor (rule content edited / partition flip), runShard now walks the live filer tree under the RecoveryView before rewinding the cursor. Already-due objects across the rewritten rule set fire immediately instead of waiting on the sliding meta-log replay. Still scoped to replay-eligible action kinds because checkSnapshotForUnsupported continues to reject walker-bound rules (ExpirationDate / ExpiredDeleteMarker / NewerNoncurrent) and scan_only-promoted rules at the top of Run. The follow-up commit relaxes the gate once the steady-state walker over RulesForShard's walk view is wired so those rules fire every day, not just on rule edits. --- weed/worker/tasks/s3_lifecycle/handler.go | 24 +++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/weed/worker/tasks/s3_lifecycle/handler.go b/weed/worker/tasks/s3_lifecycle/handler.go index 98c2404fd..7bd75707f 100644 --- a/weed/worker/tasks/s3_lifecycle/handler.go +++ b/weed/worker/tasks/s3_lifecycle/handler.go @@ -319,10 +319,29 @@ func (h *Handler) executeDailyReplay(ctx context.Context, request *plugin_pb.Exe limiter, limiterDesc := buildLimiterFromClusterContext(request.GetClusterContext()) + // Reuse one LifecycleClient across the replay drain and the + // walker's per-entry dispatch. + client := lifecycleRPCAdapter{c: rpc} + + // Bucket list for the walker — derived from inputs so it matches + // the snapshot the engine compiled. + buckets := make([]string, 0, len(inputs)) + for _, in := range inputs { + if in.Bucket != "" { + buckets = append(buckets, in.Bucket) + } + } + walkerListFn := dailyrun.FilerListFunc(filerClient, bucketsPath) + walkerDispatch := &dailyrun.WalkerDispatcher{Client: client} + walker := dailyrun.WalkerFunc(func(walkCtx context.Context, view *engine.Snapshot, shardID int) error { + return dailyrun.WalkBuckets(walkCtx, view, shardID, buckets, walkerListFn, walkerDispatch) + }) + _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ JobId: request.Job.JobId, JobType: jobType, State: plugin_pb.JobState_JOB_STATE_RUNNING, Stage: "starting", - Message: fmt.Sprintf("daily_replay shards=%d workers=%d runtime=%s rate=%s", len(shards), cfg.Workers, cfg.MaxRuntime, limiterDesc), + Message: fmt.Sprintf("daily_replay shards=%d workers=%d runtime=%s rate=%s buckets=%d walker=on", + len(shards), cfg.Workers, cfg.MaxRuntime, limiterDesc, len(buckets)), }) runErr := dailyrun.Run(ctx, dailyrun.Config{ @@ -330,11 +349,12 @@ func (h *Handler) executeDailyReplay(ctx context.Context, request *plugin_pb.Exe BucketsPath: bucketsPath, Engine: eng, FilerClient: filerClient, - Client: lifecycleRPCAdapter{c: rpc}, + Client: client, Persister: &dailyrun.FilerCursorPersister{Store: dispatcher.NewFilerStoreClient(filerClient)}, Lister: dispatcher.NewFilerSiblingLister(filerClient, bucketsPath), Workers: cfg.Workers, Limiter: limiter, + Walker: walker, ClientName: "worker-s3-lifecycle-daily", }) if dailyrun.IsUnsupportedRule(runErr) {