diff --git a/weed/worker/tasks/s3_lifecycle/handler.go b/weed/worker/tasks/s3_lifecycle/handler.go index 98c2404fd..7bd75707f 100644 --- a/weed/worker/tasks/s3_lifecycle/handler.go +++ b/weed/worker/tasks/s3_lifecycle/handler.go @@ -319,10 +319,29 @@ func (h *Handler) executeDailyReplay(ctx context.Context, request *plugin_pb.Exe limiter, limiterDesc := buildLimiterFromClusterContext(request.GetClusterContext()) + // Reuse one LifecycleClient across the replay drain and the + // walker's per-entry dispatch. + client := lifecycleRPCAdapter{c: rpc} + + // Bucket list for the walker — derived from inputs so it matches + // the snapshot the engine compiled. + buckets := make([]string, 0, len(inputs)) + for _, in := range inputs { + if in.Bucket != "" { + buckets = append(buckets, in.Bucket) + } + } + walkerListFn := dailyrun.FilerListFunc(filerClient, bucketsPath) + walkerDispatch := &dailyrun.WalkerDispatcher{Client: client} + walker := dailyrun.WalkerFunc(func(walkCtx context.Context, view *engine.Snapshot, shardID int) error { + return dailyrun.WalkBuckets(walkCtx, view, shardID, buckets, walkerListFn, walkerDispatch) + }) + _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ JobId: request.Job.JobId, JobType: jobType, State: plugin_pb.JobState_JOB_STATE_RUNNING, Stage: "starting", - Message: fmt.Sprintf("daily_replay shards=%d workers=%d runtime=%s rate=%s", len(shards), cfg.Workers, cfg.MaxRuntime, limiterDesc), + Message: fmt.Sprintf("daily_replay shards=%d workers=%d runtime=%s rate=%s buckets=%d walker=on", + len(shards), cfg.Workers, cfg.MaxRuntime, limiterDesc, len(buckets)), }) runErr := dailyrun.Run(ctx, dailyrun.Config{ @@ -330,11 +349,12 @@ func (h *Handler) executeDailyReplay(ctx context.Context, request *plugin_pb.Exe BucketsPath: bucketsPath, Engine: eng, FilerClient: filerClient, - Client: lifecycleRPCAdapter{c: rpc}, + Client: client, Persister: &dailyrun.FilerCursorPersister{Store: dispatcher.NewFilerStoreClient(filerClient)}, Lister: dispatcher.NewFilerSiblingLister(filerClient, bucketsPath), Workers: cfg.Workers, Limiter: limiter, + Walker: walker, ClientName: "worker-s3-lifecycle-daily", }) if dailyrun.IsUnsupportedRule(runErr) {