From 35786d72e0804186e46c92ab2d3761af97b7cfa2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 3 Mar 2026 18:49:28 -0800 Subject: [PATCH] fix: unconditional lock release and context-aware executor reservation Address PR review findings: - Make defer releaseLock() unconditional in runSchedulerIteration since acquireAdminLock guarantees a non-nil release func on success. - Add ctx parameter to reserveScheduledExecutor so it honors the per-job-type budget context, preventing waits from exceeding the 30-minute budget. - Add waitForShutdownOrCtx helper that also selects on ctx.Done(). - Update all reserveScheduledExecutor call sites in tests. Co-Authored-By: Claude Opus 4.6 --- weed/admin/plugin/plugin_scheduler.go | 35 +++++++++++++++++----- weed/admin/plugin/plugin_scheduler_test.go | 10 +++---- 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/weed/admin/plugin/plugin_scheduler.go b/weed/admin/plugin/plugin_scheduler.go index 60aac274e..844428374 100644 --- a/weed/admin/plugin/plugin_scheduler.go +++ b/weed/admin/plugin/plugin_scheduler.go @@ -138,9 +138,7 @@ func (r *Plugin) runSchedulerIteration() bool { r.finishIteration(false) return false } - if releaseLock != nil { - defer releaseLock() - } + defer releaseLock() // Load cluster context ONCE for all job types. clusterContext, err := r.loadSchedulerClusterContext() @@ -523,7 +521,7 @@ func (r *Plugin) dispatchScheduledProposals( default: } - executor, release, reserveErr := r.reserveScheduledExecutor(jobType, policy) + executor, release, reserveErr := r.reserveScheduledExecutor(parentCtx, jobType, policy) if reserveErr != nil { select { case <-r.shutdownCh: @@ -588,6 +586,7 @@ func (r *Plugin) dispatchScheduledProposals( } func (r *Plugin) reserveScheduledExecutor( + ctx context.Context, jobType string, policy schedulerPolicy, ) (*WorkerSession, func(), error) { @@ -600,6 +599,8 @@ func (r *Plugin) reserveScheduledExecutor( select { case <-r.shutdownCh: return nil, nil, fmt.Errorf("plugin is shutting down") + case <-ctx.Done(): + return nil, nil, ctx.Err() default: } @@ -609,8 +610,8 @@ func (r *Plugin) reserveScheduledExecutor( executors, err := r.registry.ListExecutors(jobType) if err != nil { - if !waitForShutdownOrTimer(r.shutdownCh, policy.ExecutorReserveBackoff) { - return nil, nil, fmt.Errorf("plugin is shutting down") + if !waitForShutdownOrCtx(r.shutdownCh, ctx, policy.ExecutorReserveBackoff) { + return nil, nil, fmt.Errorf("plugin is shutting down or context canceled") } continue } @@ -623,8 +624,8 @@ func (r *Plugin) reserveScheduledExecutor( return executor, release, nil } - if !waitForShutdownOrTimer(r.shutdownCh, policy.ExecutorReserveBackoff) { - return nil, nil, fmt.Errorf("plugin is shutting down") + if !waitForShutdownOrCtx(r.shutdownCh, ctx, policy.ExecutorReserveBackoff) { + return nil, nil, fmt.Errorf("plugin is shutting down or context canceled") } } } @@ -892,6 +893,24 @@ func waitForShutdownOrTimer(shutdown <-chan struct{}, duration time.Duration) bo } } +func waitForShutdownOrCtx(shutdown <-chan struct{}, ctx context.Context, duration time.Duration) bool { + if duration <= 0 { + return true + } + + timer := time.NewTimer(duration) + defer timer.Stop() + + select { + case <-shutdown: + return false + case <-ctx.Done(): + return false + case <-timer.C: + return true + } +} + // filterProposalsWithActiveJobs removes proposals whose dedupe keys already have active jobs. // It first expires stale tracked jobs via expireStaleJobs, which can mutate scheduler state, // so callers should treat this method as a stateful operation. diff --git a/weed/admin/plugin/plugin_scheduler_test.go b/weed/admin/plugin/plugin_scheduler_test.go index 848fe15ff..8db5c9e3b 100644 --- a/weed/admin/plugin/plugin_scheduler_test.go +++ b/weed/admin/plugin/plugin_scheduler_test.go @@ -131,13 +131,13 @@ func TestReserveScheduledExecutorRespectsPerWorkerLimit(t *testing.T) { ExecutorReserveBackoff: time.Millisecond, } - executor1, release1, err := pluginSvc.reserveScheduledExecutor("balance", policy) + executor1, release1, err := pluginSvc.reserveScheduledExecutor(context.Background(), "balance", policy) if err != nil { t.Fatalf("reserve executor 1: %v", err) } defer release1() - executor2, release2, err := pluginSvc.reserveScheduledExecutor("balance", policy) + executor2, release2, err := pluginSvc.reserveScheduledExecutor(context.Background(), "balance", policy) if err != nil { t.Fatalf("reserve executor 2: %v", err) } @@ -259,7 +259,7 @@ func TestReserveScheduledExecutorTimesOutWhenNoExecutor(t *testing.T) { start := time.Now() pluginSvc.Shutdown() - _, _, err = pluginSvc.reserveScheduledExecutor("missing-job-type", policy) + _, _, err = pluginSvc.reserveScheduledExecutor(context.Background(), "missing-job-type", policy) if err == nil { t.Fatalf("expected reservation shutdown error") } @@ -290,7 +290,7 @@ func TestReserveScheduledExecutorWaitsForWorkerCapacity(t *testing.T) { ExecutorReserveBackoff: 5 * time.Millisecond, } - _, release1, err := pluginSvc.reserveScheduledExecutor("balance", policy) + _, release1, err := pluginSvc.reserveScheduledExecutor(context.Background(), "balance", policy) if err != nil { t.Fatalf("reserve executor 1: %v", err) } @@ -301,7 +301,7 @@ func TestReserveScheduledExecutorWaitsForWorkerCapacity(t *testing.T) { } secondReserveCh := make(chan reserveResult, 1) go func() { - _, release2, reserveErr := pluginSvc.reserveScheduledExecutor("balance", policy) + _, release2, reserveErr := pluginSvc.reserveScheduledExecutor(context.Background(), "balance", policy) if release2 != nil { release2() }