fix: unconditional lock release and context-aware executor reservation

Address PR review findings:
- Make defer releaseLock() unconditional in runSchedulerIteration since
  acquireAdminLock guarantees a non-nil release func on success.
- Add ctx parameter to reserveScheduledExecutor so it honors the
  per-job-type budget context, preventing waits from exceeding the
  30-minute budget.
- Add waitForShutdownOrCtx helper that also selects on ctx.Done().
- Update all reserveScheduledExecutor call sites in tests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Chris Lu
2026-03-03 18:49:28 -08:00
parent 51d715b4e6
commit 35786d72e0
2 changed files with 32 additions and 13 deletions

View File

@@ -138,9 +138,7 @@ func (r *Plugin) runSchedulerIteration() bool {
r.finishIteration(false)
return false
}
if releaseLock != nil {
defer releaseLock()
}
defer releaseLock()
// Load cluster context ONCE for all job types.
clusterContext, err := r.loadSchedulerClusterContext()
@@ -523,7 +521,7 @@ func (r *Plugin) dispatchScheduledProposals(
default:
}
executor, release, reserveErr := r.reserveScheduledExecutor(jobType, policy)
executor, release, reserveErr := r.reserveScheduledExecutor(parentCtx, jobType, policy)
if reserveErr != nil {
select {
case <-r.shutdownCh:
@@ -588,6 +586,7 @@ func (r *Plugin) dispatchScheduledProposals(
}
func (r *Plugin) reserveScheduledExecutor(
ctx context.Context,
jobType string,
policy schedulerPolicy,
) (*WorkerSession, func(), error) {
@@ -600,6 +599,8 @@ func (r *Plugin) reserveScheduledExecutor(
select {
case <-r.shutdownCh:
return nil, nil, fmt.Errorf("plugin is shutting down")
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
}
@@ -609,8 +610,8 @@ func (r *Plugin) reserveScheduledExecutor(
executors, err := r.registry.ListExecutors(jobType)
if err != nil {
if !waitForShutdownOrTimer(r.shutdownCh, policy.ExecutorReserveBackoff) {
return nil, nil, fmt.Errorf("plugin is shutting down")
if !waitForShutdownOrCtx(r.shutdownCh, ctx, policy.ExecutorReserveBackoff) {
return nil, nil, fmt.Errorf("plugin is shutting down or context canceled")
}
continue
}
@@ -623,8 +624,8 @@ func (r *Plugin) reserveScheduledExecutor(
return executor, release, nil
}
if !waitForShutdownOrTimer(r.shutdownCh, policy.ExecutorReserveBackoff) {
return nil, nil, fmt.Errorf("plugin is shutting down")
if !waitForShutdownOrCtx(r.shutdownCh, ctx, policy.ExecutorReserveBackoff) {
return nil, nil, fmt.Errorf("plugin is shutting down or context canceled")
}
}
}
@@ -892,6 +893,24 @@ func waitForShutdownOrTimer(shutdown <-chan struct{}, duration time.Duration) bo
}
}
func waitForShutdownOrCtx(shutdown <-chan struct{}, ctx context.Context, duration time.Duration) bool {
if duration <= 0 {
return true
}
timer := time.NewTimer(duration)
defer timer.Stop()
select {
case <-shutdown:
return false
case <-ctx.Done():
return false
case <-timer.C:
return true
}
}
// filterProposalsWithActiveJobs removes proposals whose dedupe keys already have active jobs.
// It first expires stale tracked jobs via expireStaleJobs, which can mutate scheduler state,
// so callers should treat this method as a stateful operation.

View File

@@ -131,13 +131,13 @@ func TestReserveScheduledExecutorRespectsPerWorkerLimit(t *testing.T) {
ExecutorReserveBackoff: time.Millisecond,
}
executor1, release1, err := pluginSvc.reserveScheduledExecutor("balance", policy)
executor1, release1, err := pluginSvc.reserveScheduledExecutor(context.Background(), "balance", policy)
if err != nil {
t.Fatalf("reserve executor 1: %v", err)
}
defer release1()
executor2, release2, err := pluginSvc.reserveScheduledExecutor("balance", policy)
executor2, release2, err := pluginSvc.reserveScheduledExecutor(context.Background(), "balance", policy)
if err != nil {
t.Fatalf("reserve executor 2: %v", err)
}
@@ -259,7 +259,7 @@ func TestReserveScheduledExecutorTimesOutWhenNoExecutor(t *testing.T) {
start := time.Now()
pluginSvc.Shutdown()
_, _, err = pluginSvc.reserveScheduledExecutor("missing-job-type", policy)
_, _, err = pluginSvc.reserveScheduledExecutor(context.Background(), "missing-job-type", policy)
if err == nil {
t.Fatalf("expected reservation shutdown error")
}
@@ -290,7 +290,7 @@ func TestReserveScheduledExecutorWaitsForWorkerCapacity(t *testing.T) {
ExecutorReserveBackoff: 5 * time.Millisecond,
}
_, release1, err := pluginSvc.reserveScheduledExecutor("balance", policy)
_, release1, err := pluginSvc.reserveScheduledExecutor(context.Background(), "balance", policy)
if err != nil {
t.Fatalf("reserve executor 1: %v", err)
}
@@ -301,7 +301,7 @@ func TestReserveScheduledExecutorWaitsForWorkerCapacity(t *testing.T) {
}
secondReserveCh := make(chan reserveResult, 1)
go func() {
_, release2, reserveErr := pluginSvc.reserveScheduledExecutor("balance", policy)
_, release2, reserveErr := pluginSvc.reserveScheduledExecutor(context.Background(), "balance", policy)
if release2 != nil {
release2()
}