diff --git a/weed/admin/plugin/cluster_rate_limit.go b/weed/admin/plugin/cluster_rate_limit.go index 2ee251a41..e1802a5f7 100644 --- a/weed/admin/plugin/cluster_rate_limit.go +++ b/weed/admin/plugin/cluster_rate_limit.go @@ -40,7 +40,16 @@ const ( // Today only s3_lifecycle decorates; the function exists so a future // job type's plumbing slots in alongside without touching // executeJobWithExecutor. -func (r *Plugin) decorateClusterContextForJob(cc *plugin_pb.ClusterContext, jobType string, adminConfigValues map[string]*plugin_pb.ConfigValue) *plugin_pb.ClusterContext { +// +// maxJobsPerDetection is the job-type's AdminRuntimeConfig.MaxJobsPerDetection +// — the cap on how many parallel instances of this job the scheduler will +// dispatch per detection cycle. For singleton jobs (s3_lifecycle has +// MaxJobsPerDetection=1) only one worker is ever active, so the cluster +// budget must go to that worker undivided. For parallel-dispatch jobs the +// budget divides across the actually-running set, not across every +// capable worker. The divisor is min(executors, maxJobsPerDetection), +// clamped to ≥1. +func (r *Plugin) decorateClusterContextForJob(cc *plugin_pb.ClusterContext, jobType string, adminConfigValues map[string]*plugin_pb.ConfigValue, maxJobsPerDetection int) *plugin_pb.ClusterContext { if cc == nil { return cc } @@ -62,10 +71,17 @@ func (r *Plugin) decorateClusterContextForJob(cc *plugin_pb.ClusterContext, jobT glog.V(2).Infof("decorateClusterContext: %s rps=%d but no execute-capable workers; skipping allocation", jobType, rps) return cc } - perWorkerRps := float64(rps) / float64(executors) + // Divide by the number of *concurrently-running* workers, not the + // number of capable ones. A singleton job (maxJobs=1) gets the full + // budget on its single active worker. + activeWorkers := executors + if maxJobsPerDetection > 0 && maxJobsPerDetection < activeWorkers { + activeWorkers = maxJobsPerDetection + } + perWorkerRps := float64(rps) / float64(activeWorkers) perWorkerBurst := 0 if burst > 0 { - perWorkerBurst = burst / executors + perWorkerBurst = burst / activeWorkers if perWorkerBurst < 1 { perWorkerBurst = 1 } @@ -81,8 +97,8 @@ func (r *Plugin) decorateClusterContextForJob(cc *plugin_pb.ClusterContext, jobT if perWorkerBurst > 0 { out.Metadata[s3LifecycleMetadataDeletesBurst] = strconv.Itoa(perWorkerBurst) } - glog.V(3).Infof("decorateClusterContext: %s rps=%d burst=%d executors=%d -> per-worker rps=%g burst=%d", - jobType, rps, burst, executors, perWorkerRps, perWorkerBurst) + glog.V(3).Infof("decorateClusterContext: %s rps=%d burst=%d executors=%d maxJobs=%d active=%d -> per-worker rps=%g burst=%d", + jobType, rps, burst, executors, maxJobsPerDetection, activeWorkers, perWorkerRps, perWorkerBurst) return out } diff --git a/weed/admin/plugin/cluster_rate_limit_test.go b/weed/admin/plugin/cluster_rate_limit_test.go index 4407e6d8c..9a336db40 100644 --- a/weed/admin/plugin/cluster_rate_limit_test.go +++ b/weed/admin/plugin/cluster_rate_limit_test.go @@ -59,7 +59,7 @@ func TestDecorateClusterContext_NonS3LifecycleIsPassThrough(t *testing.T) { // is pass-through. r := pluginWithExecutors(t, s3LifecycleJobType, 4) in := &plugin_pb.ClusterContext{Metadata: map[string]string{"unrelated": "v"}} - out := r.decorateClusterContextForJob(in, "some_other_job", adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100)) + out := r.decorateClusterContextForJob(in, "some_other_job", adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 1) assert.Same(t, in, out, "non-allocator job type must return the same pointer") } @@ -70,7 +70,7 @@ func TestDecorateClusterContext_RpsZeroSkipsAllocation(t *testing.T) { // worker into a no-throughput state on a misconfigured cluster. r := pluginWithExecutors(t, s3LifecycleJobType, 4) in := &plugin_pb.ClusterContext{} - out := r.decorateClusterContextForJob(in, s3LifecycleJobType, adminConfig(s3LifecycleClusterDeletesPerSecondKey, 0)) + out := r.decorateClusterContextForJob(in, s3LifecycleJobType, adminConfig(s3LifecycleClusterDeletesPerSecondKey, 0), 1) if out.Metadata != nil { _, hasRps := out.Metadata[s3LifecycleMetadataDeletesPerSecond] assert.False(t, hasRps, "rps=0 must not write a deletes_per_second key") @@ -80,26 +80,55 @@ func TestDecorateClusterContext_RpsZeroSkipsAllocation(t *testing.T) { func TestDecorateClusterContext_NoExecutorsSkipsAllocation(t *testing.T) { r := pluginWithExecutors(t, s3LifecycleJobType, 0) in := &plugin_pb.ClusterContext{} - out := r.decorateClusterContextForJob(in, s3LifecycleJobType, adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100)) + out := r.decorateClusterContextForJob(in, s3LifecycleJobType, adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 1) if out.Metadata != nil { _, hasRps := out.Metadata[s3LifecycleMetadataDeletesPerSecond] assert.False(t, hasRps, "0 executors must not write share metadata (would divide by zero)") } } -func TestDecorateClusterContext_SharedEvenly(t *testing.T) { +func TestDecorateClusterContext_SingletonJobGetsFullBudget(t *testing.T) { + // s3_lifecycle has MaxJobsPerDetection=1: only ONE worker runs the + // job at a time. The cluster budget must go to that worker undivided + // — dividing by N capable executors would starve the active worker + // to 1/N of the configured rps. Pin the singleton behavior. r := pluginWithExecutors(t, s3LifecycleJobType, 4) in := &plugin_pb.ClusterContext{} - out := r.decorateClusterContextForJob(in, s3LifecycleJobType, adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100)) + out := r.decorateClusterContextForJob(in, s3LifecycleJobType, + adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 1) require.NotNil(t, out.Metadata) - assert.Equal(t, "25", out.Metadata[s3LifecycleMetadataDeletesPerSecond], "100 / 4 executors = 25/worker") + assert.Equal(t, "100", out.Metadata[s3LifecycleMetadataDeletesPerSecond], "singleton job: full budget to the single active worker") } -func TestDecorateClusterContext_BurstShared(t *testing.T) { +func TestDecorateClusterContext_SharedEvenlyWhenParallelLimited(t *testing.T) { + // Hypothetical parallel-dispatch job type (maxJobs=4): budget + // divides across the running-set, which equals min(executors, + // maxJobs)=4. 100/4=25. + r := pluginWithExecutors(t, s3LifecycleJobType, 4) + in := &plugin_pb.ClusterContext{} + out := r.decorateClusterContextForJob(in, s3LifecycleJobType, + adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 4) + require.NotNil(t, out.Metadata) + assert.Equal(t, "25", out.Metadata[s3LifecycleMetadataDeletesPerSecond], "maxJobs=4 across 4 executors = 25/worker") +} + +func TestDecorateClusterContext_MaxJobsExceedsExecutors(t *testing.T) { + // maxJobs=10 but only 4 executors exist — the divisor is the + // smaller value (executors) since you can't run more jobs in + // parallel than there are workers to run them. + r := pluginWithExecutors(t, s3LifecycleJobType, 4) + in := &plugin_pb.ClusterContext{} + out := r.decorateClusterContextForJob(in, s3LifecycleJobType, + adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 10) + require.NotNil(t, out.Metadata) + assert.Equal(t, "25", out.Metadata[s3LifecycleMetadataDeletesPerSecond]) +} + +func TestDecorateClusterContext_BurstSharedWhenParallel(t *testing.T) { r := pluginWithExecutors(t, s3LifecycleJobType, 2) in := &plugin_pb.ClusterContext{} out := r.decorateClusterContextForJob(in, s3LifecycleJobType, - adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100, s3LifecycleClusterDeletesBurstKey, 20)) + adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100, s3LifecycleClusterDeletesBurstKey, 20), 2) require.NotNil(t, out.Metadata) assert.Equal(t, "50", out.Metadata[s3LifecycleMetadataDeletesPerSecond]) assert.Equal(t, "10", out.Metadata[s3LifecycleMetadataDeletesBurst]) @@ -112,18 +141,18 @@ func TestDecorateClusterContext_BurstZeroOmitsKey(t *testing.T) { r := pluginWithExecutors(t, s3LifecycleJobType, 4) in := &plugin_pb.ClusterContext{} out := r.decorateClusterContextForJob(in, s3LifecycleJobType, - adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100, s3LifecycleClusterDeletesBurstKey, 0)) + adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100, s3LifecycleClusterDeletesBurstKey, 0), 4) _, hasBurst := out.Metadata[s3LifecycleMetadataDeletesBurst] assert.False(t, hasBurst, "burst=0 must NOT write the burst key (worker default kicks in)") } func TestDecorateClusterContext_BurstFloorIsOneWhenDividesBelowOne(t *testing.T) { - // burst=1 across 4 workers would round to 0; clamp to 1 so the - // limiter doesn't become "single-token bucket that never refills." + // burst=1 across 4 active workers would round to 0; clamp to 1 so + // the limiter doesn't become "single-token bucket that never refills." r := pluginWithExecutors(t, s3LifecycleJobType, 4) in := &plugin_pb.ClusterContext{} out := r.decorateClusterContextForJob(in, s3LifecycleJobType, - adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100, s3LifecycleClusterDeletesBurstKey, 1)) + adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100, s3LifecycleClusterDeletesBurstKey, 1), 4) assert.Equal(t, "1", out.Metadata[s3LifecycleMetadataDeletesBurst]) } @@ -135,7 +164,7 @@ func TestDecorateClusterContext_DoesNotMutateInput(t *testing.T) { baseMeta := map[string]string{"existing": "value"} in := &plugin_pb.ClusterContext{Metadata: baseMeta} _ = r.decorateClusterContextForJob(in, s3LifecycleJobType, - adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100)) + adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 1) _, leaked := baseMeta[s3LifecycleMetadataDeletesPerSecond] assert.False(t, leaked, "decorator must not mutate the input metadata map") assert.Equal(t, "value", baseMeta["existing"]) @@ -143,6 +172,6 @@ func TestDecorateClusterContext_DoesNotMutateInput(t *testing.T) { func TestDecorateClusterContext_NilInputPassesThrough(t *testing.T) { r := pluginWithExecutors(t, s3LifecycleJobType, 4) - out := r.decorateClusterContextForJob(nil, s3LifecycleJobType, adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100)) + out := r.decorateClusterContextForJob(nil, s3LifecycleJobType, adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 1) assert.Nil(t, out) } diff --git a/weed/admin/plugin/plugin.go b/weed/admin/plugin/plugin.go index 370484e44..d50d7d2a3 100644 --- a/weed/admin/plugin/plugin.go +++ b/weed/admin/plugin/plugin.go @@ -650,10 +650,12 @@ func (r *Plugin) executeJobWithExecutor( } // Apply per-job-type cluster-allocation decoration (e.g. s3_lifecycle - // divides cluster_deletes_per_second by the worker count and ships - // the share via ClusterContext.Metadata). No-op for job types - // without an allocator registered. - clusterContext = r.decorateClusterContextForJob(clusterContext, job.JobType, adminConfigValues) + // divides cluster_deletes_per_second by min(workers, maxJobsPerDetection) + // and ships the share via ClusterContext.Metadata). No-op for job + // types without an allocator registered. MaxJobsPerDetection caps + // the divisor so a singleton job (maxJobs=1) gets the full budget on + // the single active worker, not 1/N of it. + clusterContext = r.decorateClusterContextForJob(clusterContext, job.JobType, adminConfigValues, int(adminRuntime.GetMaxJobsPerDetection())) completedCh := make(chan *plugin_pb.JobCompleted, 1) r.pendingExecutionMu.Lock()