fix(s3/lifecycle): divide cluster budget by active workers, not all capable

gemini pointed out that s3_lifecycle has MaxJobsPerDetection=1
(handler.go:189) — it's a singleton job, only one worker is ever active.
Dividing the cluster_deletes_per_second budget by the count of capable
executors gave the single active worker just 1/N of the configured cap.

Pass adminRuntime.MaxJobsPerDetection through to the decorator. Divisor
is now min(executors, maxJobsPerDetection), clamped to >=1. For
s3_lifecycle (maxJobs=1) the active worker gets the full budget; for a
hypothetical parallel-dispatch job (maxJobs>1) the budget divides
across the running-set.

Tests swap the SharedEvenly case for two pinned scenarios:
  - SingletonJobGetsFullBudget: maxJobs=1 across 4 executors => 100/1
  - SharedEvenlyWhenParallelLimited: maxJobs=4 across 4 executors => 25/worker
  - MaxJobsExceedsExecutors: maxJobs=10 across 4 executors => divisor 4
This commit is contained in:
Chris Lu
2026-05-11 19:05:56 -07:00
parent c51db540cc
commit b85af3483e
3 changed files with 70 additions and 23 deletions

View File

@@ -40,7 +40,16 @@ const (
// Today only s3_lifecycle decorates; the function exists so a future
// job type's plumbing slots in alongside without touching
// executeJobWithExecutor.
func (r *Plugin) decorateClusterContextForJob(cc *plugin_pb.ClusterContext, jobType string, adminConfigValues map[string]*plugin_pb.ConfigValue) *plugin_pb.ClusterContext {
//
// maxJobsPerDetection is the job-type's AdminRuntimeConfig.MaxJobsPerDetection
// — the cap on how many parallel instances of this job the scheduler will
// dispatch per detection cycle. For singleton jobs (s3_lifecycle has
// MaxJobsPerDetection=1) only one worker is ever active, so the cluster
// budget must go to that worker undivided. For parallel-dispatch jobs the
// budget divides across the actually-running set, not across every
// capable worker. The divisor is min(executors, maxJobsPerDetection),
// clamped to ≥1.
func (r *Plugin) decorateClusterContextForJob(cc *plugin_pb.ClusterContext, jobType string, adminConfigValues map[string]*plugin_pb.ConfigValue, maxJobsPerDetection int) *plugin_pb.ClusterContext {
if cc == nil {
return cc
}
@@ -62,10 +71,17 @@ func (r *Plugin) decorateClusterContextForJob(cc *plugin_pb.ClusterContext, jobT
glog.V(2).Infof("decorateClusterContext: %s rps=%d but no execute-capable workers; skipping allocation", jobType, rps)
return cc
}
perWorkerRps := float64(rps) / float64(executors)
// Divide by the number of *concurrently-running* workers, not the
// number of capable ones. A singleton job (maxJobs=1) gets the full
// budget on its single active worker.
activeWorkers := executors
if maxJobsPerDetection > 0 && maxJobsPerDetection < activeWorkers {
activeWorkers = maxJobsPerDetection
}
perWorkerRps := float64(rps) / float64(activeWorkers)
perWorkerBurst := 0
if burst > 0 {
perWorkerBurst = burst / executors
perWorkerBurst = burst / activeWorkers
if perWorkerBurst < 1 {
perWorkerBurst = 1
}
@@ -81,8 +97,8 @@ func (r *Plugin) decorateClusterContextForJob(cc *plugin_pb.ClusterContext, jobT
if perWorkerBurst > 0 {
out.Metadata[s3LifecycleMetadataDeletesBurst] = strconv.Itoa(perWorkerBurst)
}
glog.V(3).Infof("decorateClusterContext: %s rps=%d burst=%d executors=%d -> per-worker rps=%g burst=%d",
jobType, rps, burst, executors, perWorkerRps, perWorkerBurst)
glog.V(3).Infof("decorateClusterContext: %s rps=%d burst=%d executors=%d maxJobs=%d active=%d -> per-worker rps=%g burst=%d",
jobType, rps, burst, executors, maxJobsPerDetection, activeWorkers, perWorkerRps, perWorkerBurst)
return out
}

View File

@@ -59,7 +59,7 @@ func TestDecorateClusterContext_NonS3LifecycleIsPassThrough(t *testing.T) {
// is pass-through.
r := pluginWithExecutors(t, s3LifecycleJobType, 4)
in := &plugin_pb.ClusterContext{Metadata: map[string]string{"unrelated": "v"}}
out := r.decorateClusterContextForJob(in, "some_other_job", adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100))
out := r.decorateClusterContextForJob(in, "some_other_job", adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 1)
assert.Same(t, in, out, "non-allocator job type must return the same pointer")
}
@@ -70,7 +70,7 @@ func TestDecorateClusterContext_RpsZeroSkipsAllocation(t *testing.T) {
// worker into a no-throughput state on a misconfigured cluster.
r := pluginWithExecutors(t, s3LifecycleJobType, 4)
in := &plugin_pb.ClusterContext{}
out := r.decorateClusterContextForJob(in, s3LifecycleJobType, adminConfig(s3LifecycleClusterDeletesPerSecondKey, 0))
out := r.decorateClusterContextForJob(in, s3LifecycleJobType, adminConfig(s3LifecycleClusterDeletesPerSecondKey, 0), 1)
if out.Metadata != nil {
_, hasRps := out.Metadata[s3LifecycleMetadataDeletesPerSecond]
assert.False(t, hasRps, "rps=0 must not write a deletes_per_second key")
@@ -80,26 +80,55 @@ func TestDecorateClusterContext_RpsZeroSkipsAllocation(t *testing.T) {
func TestDecorateClusterContext_NoExecutorsSkipsAllocation(t *testing.T) {
r := pluginWithExecutors(t, s3LifecycleJobType, 0)
in := &plugin_pb.ClusterContext{}
out := r.decorateClusterContextForJob(in, s3LifecycleJobType, adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100))
out := r.decorateClusterContextForJob(in, s3LifecycleJobType, adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 1)
if out.Metadata != nil {
_, hasRps := out.Metadata[s3LifecycleMetadataDeletesPerSecond]
assert.False(t, hasRps, "0 executors must not write share metadata (would divide by zero)")
}
}
func TestDecorateClusterContext_SharedEvenly(t *testing.T) {
func TestDecorateClusterContext_SingletonJobGetsFullBudget(t *testing.T) {
// s3_lifecycle has MaxJobsPerDetection=1: only ONE worker runs the
// job at a time. The cluster budget must go to that worker undivided
// — dividing by N capable executors would starve the active worker
// to 1/N of the configured rps. Pin the singleton behavior.
r := pluginWithExecutors(t, s3LifecycleJobType, 4)
in := &plugin_pb.ClusterContext{}
out := r.decorateClusterContextForJob(in, s3LifecycleJobType, adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100))
out := r.decorateClusterContextForJob(in, s3LifecycleJobType,
adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 1)
require.NotNil(t, out.Metadata)
assert.Equal(t, "25", out.Metadata[s3LifecycleMetadataDeletesPerSecond], "100 / 4 executors = 25/worker")
assert.Equal(t, "100", out.Metadata[s3LifecycleMetadataDeletesPerSecond], "singleton job: full budget to the single active worker")
}
func TestDecorateClusterContext_BurstShared(t *testing.T) {
func TestDecorateClusterContext_SharedEvenlyWhenParallelLimited(t *testing.T) {
// Hypothetical parallel-dispatch job type (maxJobs=4): budget
// divides across the running-set, which equals min(executors,
// maxJobs)=4. 100/4=25.
r := pluginWithExecutors(t, s3LifecycleJobType, 4)
in := &plugin_pb.ClusterContext{}
out := r.decorateClusterContextForJob(in, s3LifecycleJobType,
adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 4)
require.NotNil(t, out.Metadata)
assert.Equal(t, "25", out.Metadata[s3LifecycleMetadataDeletesPerSecond], "maxJobs=4 across 4 executors = 25/worker")
}
func TestDecorateClusterContext_MaxJobsExceedsExecutors(t *testing.T) {
// maxJobs=10 but only 4 executors exist — the divisor is the
// smaller value (executors) since you can't run more jobs in
// parallel than there are workers to run them.
r := pluginWithExecutors(t, s3LifecycleJobType, 4)
in := &plugin_pb.ClusterContext{}
out := r.decorateClusterContextForJob(in, s3LifecycleJobType,
adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 10)
require.NotNil(t, out.Metadata)
assert.Equal(t, "25", out.Metadata[s3LifecycleMetadataDeletesPerSecond])
}
func TestDecorateClusterContext_BurstSharedWhenParallel(t *testing.T) {
r := pluginWithExecutors(t, s3LifecycleJobType, 2)
in := &plugin_pb.ClusterContext{}
out := r.decorateClusterContextForJob(in, s3LifecycleJobType,
adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100, s3LifecycleClusterDeletesBurstKey, 20))
adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100, s3LifecycleClusterDeletesBurstKey, 20), 2)
require.NotNil(t, out.Metadata)
assert.Equal(t, "50", out.Metadata[s3LifecycleMetadataDeletesPerSecond])
assert.Equal(t, "10", out.Metadata[s3LifecycleMetadataDeletesBurst])
@@ -112,18 +141,18 @@ func TestDecorateClusterContext_BurstZeroOmitsKey(t *testing.T) {
r := pluginWithExecutors(t, s3LifecycleJobType, 4)
in := &plugin_pb.ClusterContext{}
out := r.decorateClusterContextForJob(in, s3LifecycleJobType,
adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100, s3LifecycleClusterDeletesBurstKey, 0))
adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100, s3LifecycleClusterDeletesBurstKey, 0), 4)
_, hasBurst := out.Metadata[s3LifecycleMetadataDeletesBurst]
assert.False(t, hasBurst, "burst=0 must NOT write the burst key (worker default kicks in)")
}
func TestDecorateClusterContext_BurstFloorIsOneWhenDividesBelowOne(t *testing.T) {
// burst=1 across 4 workers would round to 0; clamp to 1 so the
// limiter doesn't become "single-token bucket that never refills."
// burst=1 across 4 active workers would round to 0; clamp to 1 so
// the limiter doesn't become "single-token bucket that never refills."
r := pluginWithExecutors(t, s3LifecycleJobType, 4)
in := &plugin_pb.ClusterContext{}
out := r.decorateClusterContextForJob(in, s3LifecycleJobType,
adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100, s3LifecycleClusterDeletesBurstKey, 1))
adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100, s3LifecycleClusterDeletesBurstKey, 1), 4)
assert.Equal(t, "1", out.Metadata[s3LifecycleMetadataDeletesBurst])
}
@@ -135,7 +164,7 @@ func TestDecorateClusterContext_DoesNotMutateInput(t *testing.T) {
baseMeta := map[string]string{"existing": "value"}
in := &plugin_pb.ClusterContext{Metadata: baseMeta}
_ = r.decorateClusterContextForJob(in, s3LifecycleJobType,
adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100))
adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 1)
_, leaked := baseMeta[s3LifecycleMetadataDeletesPerSecond]
assert.False(t, leaked, "decorator must not mutate the input metadata map")
assert.Equal(t, "value", baseMeta["existing"])
@@ -143,6 +172,6 @@ func TestDecorateClusterContext_DoesNotMutateInput(t *testing.T) {
func TestDecorateClusterContext_NilInputPassesThrough(t *testing.T) {
r := pluginWithExecutors(t, s3LifecycleJobType, 4)
out := r.decorateClusterContextForJob(nil, s3LifecycleJobType, adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100))
out := r.decorateClusterContextForJob(nil, s3LifecycleJobType, adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 1)
assert.Nil(t, out)
}

View File

@@ -650,10 +650,12 @@ func (r *Plugin) executeJobWithExecutor(
}
// Apply per-job-type cluster-allocation decoration (e.g. s3_lifecycle
// divides cluster_deletes_per_second by the worker count and ships
// the share via ClusterContext.Metadata). No-op for job types
// without an allocator registered.
clusterContext = r.decorateClusterContextForJob(clusterContext, job.JobType, adminConfigValues)
// divides cluster_deletes_per_second by min(workers, maxJobsPerDetection)
// and ships the share via ClusterContext.Metadata). No-op for job
// types without an allocator registered. MaxJobsPerDetection caps
// the divisor so a singleton job (maxJobs=1) gets the full budget on
// the single active worker, not 1/N of it.
clusterContext = r.decorateClusterContextForJob(clusterContext, job.JobType, adminConfigValues, int(adminRuntime.GetMaxJobsPerDetection()))
completedCh := make(chan *plugin_pb.JobCompleted, 1)
r.pendingExecutionMu.Lock()