Files
seaweedfs/weed/admin/plugin/scheduler_status_test.go
Chris Lu 5d43f84df7 refactor(plugin): rename detection_interval_seconds → detection_interval_minutes (#9366)
Minutes is the natural granularity for detection cadence — every
production handler already set the seconds field to a 60-multiple
(17*60, 30*60, 3600, 24*60*60). Switching to minutes drops the *60
arithmetic and matches the unit conventions used elsewhere in the
plugin worker forms.

- Proto: AdminRuntimeDefaults + AdminRuntimeConfig.detection_interval_*
  field renamed.
- Helpers: durationFromMinutes / minutesFromDuration alongside the
  existing seconds variants in plugin_scheduler.go.
- Handlers: vacuum, ec_balance, balance, erasure_coding, iceberg,
  admin_script, s3_lifecycle now declare DetectionIntervalMinutes.
- Admin: scheduler_status + types + UI templ + plugin_api.go pass
  through the new field; UI label and table cells switch to "min".
2026-05-08 10:33:02 -07:00

148 lines
4.1 KiB
Go

package plugin
import (
"context"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
)
func TestGetSchedulerStatusIncludesInProcessJobs(t *testing.T) {
pluginSvc, err := New(Options{})
if err != nil {
t.Fatalf("New: %v", err)
}
defer pluginSvc.Shutdown()
pluginSvc.trackExecutionStart("req-1", "worker-a", &plugin_pb.JobSpec{
JobId: "job-1",
JobType: "vacuum",
}, 1)
status := pluginSvc.GetSchedulerStatus()
if len(status.InProcessJobs) != 1 {
t.Fatalf("expected one in-process job, got %d", len(status.InProcessJobs))
}
if status.InProcessJobs[0].JobID != "job-1" {
t.Fatalf("unexpected job id: %s", status.InProcessJobs[0].JobID)
}
}
func TestGetSchedulerStatusIncludesLastDetectionCount(t *testing.T) {
pluginSvc, err := New(Options{})
if err != nil {
t.Fatalf("New: %v", err)
}
defer pluginSvc.Shutdown()
const jobType = "vacuum"
pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{
WorkerId: "worker-a",
Capabilities: []*plugin_pb.JobTypeCapability{
{JobType: jobType, CanDetect: true},
},
})
pluginSvc.recordSchedulerDetectionSuccess(jobType, 3)
status := pluginSvc.GetSchedulerStatus()
found := false
for _, jt := range status.JobTypes {
if jt.JobType != jobType {
continue
}
found = true
if jt.LastDetectedCount != 3 {
t.Fatalf("unexpected last detected count: got=%d want=3", jt.LastDetectedCount)
}
if jt.LastDetectedAt == nil {
t.Fatalf("expected last detected at to be set")
}
}
if !found {
t.Fatalf("expected job type status for %s", jobType)
}
}
func TestGetLaneSchedulerStatusShowsActiveConcurrentLaneWork(t *testing.T) {
clusterContextStarted := make(chan struct{})
releaseClusterContext := make(chan struct{})
// Create the Plugin without a ClusterContextProvider so no background
// scheduler goroutines are started; they would race with the direct
// runJobTypeIteration call below.
pluginSvc, err := New(Options{})
if err != nil {
t.Fatalf("New: %v", err)
}
defer pluginSvc.Shutdown()
// Set the provider after construction so runJobTypeIteration can use it.
pluginSvc.clusterContextProvider = func(context.Context) (*plugin_pb.ClusterContext, error) {
close(clusterContextStarted)
<-releaseClusterContext
return nil, context.Canceled
}
const jobType = "s3_lifecycle"
err = pluginSvc.SaveJobTypeConfig(&plugin_pb.PersistedJobTypeConfig{
JobType: jobType,
AdminRuntime: &plugin_pb.AdminRuntimeConfig{
Enabled: true,
DetectionIntervalMinutes: 30,
DetectionTimeoutSeconds: 15,
},
})
if err != nil {
t.Fatalf("SaveJobTypeConfig: %v", err)
}
policy, enabled, err := pluginSvc.loadSchedulerPolicy(jobType)
if err != nil {
t.Fatalf("loadSchedulerPolicy: %v", err)
}
if !enabled {
t.Fatalf("expected enabled policy")
}
done := make(chan struct{})
go func() {
defer close(done)
pluginSvc.runJobTypeIteration(jobType, policy)
}()
select {
case <-clusterContextStarted:
case <-time.After(time.Second):
t.Fatalf("timed out waiting for job type iteration to start")
}
var laneStatus SchedulerStatus
var aggregateStatus SchedulerStatus
deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) {
laneStatus = pluginSvc.GetLaneSchedulerStatus(LaneLifecycle)
aggregateStatus = pluginSvc.GetSchedulerStatus()
if laneStatus.CurrentJobType == jobType && laneStatus.CurrentPhase == "detecting" &&
aggregateStatus.CurrentJobType == jobType && aggregateStatus.CurrentPhase == "detecting" {
break
}
time.Sleep(10 * time.Millisecond)
}
if laneStatus.CurrentJobType != jobType || laneStatus.CurrentPhase != "detecting" {
t.Fatalf("unexpected lane status while work is active: job=%q phase=%q", laneStatus.CurrentJobType, laneStatus.CurrentPhase)
}
if aggregateStatus.CurrentJobType != jobType || aggregateStatus.CurrentPhase != "detecting" {
t.Fatalf("unexpected aggregate status while work is active: job=%q phase=%q", aggregateStatus.CurrentJobType, aggregateStatus.CurrentPhase)
}
close(releaseClusterContext)
select {
case <-done:
case <-time.After(time.Second):
t.Fatalf("timed out waiting for job type iteration to finish")
}
}