mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-16 23:01:30 +00:00
383 lines
9.8 KiB
Go
383 lines
9.8 KiB
Go
package plugin
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Dispatcher orchestrates job detection scheduling and dispatch
|
|
type Dispatcher struct {
|
|
mu sync.RWMutex
|
|
registry *Registry
|
|
queue *JobQueue
|
|
detectionSchedules map[string]*DetectionSchedule
|
|
jobTypeStateManagement map[string]*JobTypeState
|
|
lastDetectionTime map[string]time.Time
|
|
detectionConcurrencyLimit map[string]int
|
|
}
|
|
|
|
// DetectionSchedule holds scheduling information for a detection type
|
|
type DetectionSchedule struct {
|
|
DetectionType string
|
|
Interval time.Duration
|
|
LastExecuted time.Time
|
|
NextExecutionTime time.Time
|
|
ExecutionCount int64
|
|
FailureCount int64
|
|
AverageExecutionMs float64
|
|
}
|
|
|
|
// JobTypeState manages state for a specific job type
|
|
type JobTypeState struct {
|
|
JobType string
|
|
mu sync.RWMutex
|
|
ActiveCount int
|
|
MaxConcurrent int
|
|
PendingCount int
|
|
CompletedCount int
|
|
FailedCount int
|
|
LastError string
|
|
LastExecutionTime time.Time
|
|
AverageExecutionMs float64
|
|
ExecutionHistory []time.Duration
|
|
MaxHistorySize int
|
|
}
|
|
|
|
// NewDispatcher creates a new job dispatcher
|
|
func NewDispatcher(registry *Registry, queue *JobQueue) *Dispatcher {
|
|
return &Dispatcher{
|
|
registry: registry,
|
|
queue: queue,
|
|
detectionSchedules: make(map[string]*DetectionSchedule),
|
|
jobTypeStateManagement: make(map[string]*JobTypeState),
|
|
lastDetectionTime: make(map[string]time.Time),
|
|
detectionConcurrencyLimit: make(map[string]int),
|
|
}
|
|
}
|
|
|
|
// RegisterDetectionType registers a detection type with scheduling info
|
|
func (d *Dispatcher) RegisterDetectionType(detectionType string, interval time.Duration, maxConcurrent int) error {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
if _, exists := d.detectionSchedules[detectionType]; exists {
|
|
return fmt.Errorf("detection type %s already registered", detectionType)
|
|
}
|
|
|
|
d.detectionSchedules[detectionType] = &DetectionSchedule{
|
|
DetectionType: detectionType,
|
|
Interval: interval,
|
|
NextExecutionTime: time.Now(),
|
|
}
|
|
|
|
d.detectionConcurrencyLimit[detectionType] = maxConcurrent
|
|
d.jobTypeStateManagement[detectionType] = &JobTypeState{
|
|
JobType: detectionType,
|
|
MaxConcurrent: maxConcurrent,
|
|
MaxHistorySize: 100,
|
|
ExecutionHistory: make([]time.Duration, 0, 100),
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UnregisterDetectionType removes a detection type
|
|
func (d *Dispatcher) UnregisterDetectionType(detectionType string) error {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
if _, exists := d.detectionSchedules[detectionType]; !exists {
|
|
return fmt.Errorf("detection type %s not found", detectionType)
|
|
}
|
|
|
|
delete(d.detectionSchedules, detectionType)
|
|
delete(d.detectionConcurrencyLimit, detectionType)
|
|
delete(d.jobTypeStateManagement, detectionType)
|
|
delete(d.lastDetectionTime, detectionType)
|
|
|
|
return nil
|
|
}
|
|
|
|
// ScheduleDetections checks and schedules detection jobs that are due
|
|
func (d *Dispatcher) ScheduleDetections() []string {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
var scheduledJobs []string
|
|
now := time.Now()
|
|
|
|
for detectionType, schedule := range d.detectionSchedules {
|
|
if now.After(schedule.NextExecutionTime) {
|
|
// Check if we haven't exceeded concurrency limit
|
|
state := d.jobTypeStateManagement[detectionType]
|
|
state.mu.RLock()
|
|
activeCount := state.ActiveCount
|
|
maxConcurrent := state.MaxConcurrent
|
|
state.mu.RUnlock()
|
|
|
|
if activeCount >= maxConcurrent {
|
|
continue // Skip this detection type for now
|
|
}
|
|
|
|
// Create and enqueue job
|
|
jobID := fmt.Sprintf("det-%s-%d", detectionType, now.UnixNano())
|
|
job := &Job{
|
|
ID: jobID,
|
|
Type: detectionType,
|
|
State: JobStatePending,
|
|
CreatedAt: now,
|
|
}
|
|
|
|
if err := d.queue.Enqueue(job); err != nil {
|
|
continue
|
|
}
|
|
|
|
// Update schedule
|
|
schedule.NextExecutionTime = now.Add(schedule.Interval)
|
|
schedule.ExecutionCount++
|
|
d.lastDetectionTime[detectionType] = now
|
|
|
|
scheduledJobs = append(scheduledJobs, jobID)
|
|
|
|
// Update state
|
|
state.mu.Lock()
|
|
state.PendingCount++
|
|
state.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
return scheduledJobs
|
|
}
|
|
|
|
// DispatchJob assigns a job to an available plugin
|
|
func (d *Dispatcher) DispatchJob(job *Job) (string, error) {
|
|
d.mu.RLock()
|
|
defer d.mu.RUnlock()
|
|
|
|
// Find plugins capable of handling this job type
|
|
plugins := d.registry.GetPluginsByCapability(job.Type)
|
|
if len(plugins) == 0 {
|
|
return "", fmt.Errorf("no plugins available for job type %s", job.Type)
|
|
}
|
|
|
|
// Find least loaded available plugin
|
|
var selectedPlugin *ConnectedPlugin
|
|
minLoad := int(^uint32(0) >> 1)
|
|
|
|
for _, plugin := range plugins {
|
|
if plugin.IsHealthy(30 * time.Second) {
|
|
plugin.mu.RLock()
|
|
if plugin.ActiveJobs < plugin.MaxConcurrentJobs && plugin.ActiveJobs < minLoad {
|
|
selectedPlugin = plugin
|
|
minLoad = plugin.ActiveJobs
|
|
}
|
|
plugin.mu.RUnlock()
|
|
}
|
|
}
|
|
|
|
if selectedPlugin == nil {
|
|
return "", fmt.Errorf("no healthy plugins available for job type %s", job.Type)
|
|
}
|
|
|
|
// Assign job to plugin
|
|
job.PluginID = selectedPlugin.ID
|
|
job.SetState(JobStateScheduled)
|
|
selectedPlugin.IncActiveJobs()
|
|
|
|
// Update job type state
|
|
state := d.jobTypeStateManagement[job.Type]
|
|
state.mu.Lock()
|
|
state.ActiveCount++
|
|
state.PendingCount--
|
|
state.mu.Unlock()
|
|
|
|
return selectedPlugin.ID, nil
|
|
}
|
|
|
|
// CompleteJob marks a job as completed
|
|
func (d *Dispatcher) CompleteJob(job *Job, result *JobResult) error {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
job.Result = result
|
|
job.SetState(JobStateCompleted)
|
|
|
|
// Update plugin
|
|
if plugin, err := d.registry.GetPlugin(job.PluginID); err == nil {
|
|
plugin.DecActiveJobs()
|
|
}
|
|
|
|
// Update state
|
|
if state, exists := d.jobTypeStateManagement[job.Type]; exists {
|
|
state.mu.Lock()
|
|
state.ActiveCount--
|
|
state.CompletedCount++
|
|
if job.ExecutionTime > 0 {
|
|
state.ExecutionHistory = append(state.ExecutionHistory, job.ExecutionTime)
|
|
if len(state.ExecutionHistory) > state.MaxHistorySize {
|
|
state.ExecutionHistory = state.ExecutionHistory[1:]
|
|
}
|
|
d.updateAverageExecutionTime(state)
|
|
}
|
|
state.LastExecutionTime = time.Now()
|
|
state.mu.Unlock()
|
|
}
|
|
|
|
// Update detection schedule if applicable
|
|
if schedule, exists := d.detectionSchedules[job.Type]; exists {
|
|
schedule.LastExecuted = time.Now()
|
|
}
|
|
|
|
// Record execution
|
|
record := &ExecutionRecord{
|
|
JobID: job.ID,
|
|
JobType: job.Type,
|
|
PluginID: job.PluginID,
|
|
State: job.State,
|
|
CreatedAt: job.CreatedAt,
|
|
StartedAt: job.StartedAt,
|
|
CompletedAt: job.CompletedAt,
|
|
Result: result,
|
|
}
|
|
d.queue.RecordExecution(record)
|
|
|
|
return nil
|
|
}
|
|
|
|
// FailJob marks a job as failed
|
|
func (d *Dispatcher) FailJob(job *Job, errorMsg string) error {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
job.LastError = errorMsg
|
|
job.SetState(JobStateFailed)
|
|
|
|
// Update plugin
|
|
if plugin, err := d.registry.GetPlugin(job.PluginID); err == nil {
|
|
plugin.DecActiveJobs()
|
|
}
|
|
|
|
// Update state
|
|
if state, exists := d.jobTypeStateManagement[job.Type]; exists {
|
|
state.mu.Lock()
|
|
state.ActiveCount--
|
|
state.FailedCount++
|
|
state.LastError = errorMsg
|
|
state.LastExecutionTime = time.Now()
|
|
state.mu.Unlock()
|
|
}
|
|
|
|
// Update detection schedule
|
|
if schedule, exists := d.detectionSchedules[job.Type]; exists {
|
|
schedule.FailureCount++
|
|
schedule.LastExecuted = time.Now()
|
|
}
|
|
|
|
// Record execution
|
|
record := &ExecutionRecord{
|
|
JobID: job.ID,
|
|
JobType: job.Type,
|
|
PluginID: job.PluginID,
|
|
State: job.State,
|
|
CreatedAt: job.CreatedAt,
|
|
StartedAt: job.StartedAt,
|
|
CompletedAt: job.CompletedAt,
|
|
LastError: errorMsg,
|
|
}
|
|
d.queue.RecordExecution(record)
|
|
|
|
return nil
|
|
}
|
|
|
|
// updateAverageExecutionTime recalculates average execution time from history
|
|
func (d *Dispatcher) updateAverageExecutionTime(state *JobTypeState) {
|
|
if len(state.ExecutionHistory) == 0 {
|
|
state.AverageExecutionMs = 0
|
|
return
|
|
}
|
|
|
|
var total int64
|
|
for _, duration := range state.ExecutionHistory {
|
|
total += duration.Milliseconds()
|
|
}
|
|
state.AverageExecutionMs = float64(total) / float64(len(state.ExecutionHistory))
|
|
}
|
|
|
|
// GetJobTypeState returns the state for a specific job type
|
|
func (d *Dispatcher) GetJobTypeState(jobType string) *JobTypeState {
|
|
d.mu.RLock()
|
|
defer d.mu.RUnlock()
|
|
|
|
if state, exists := d.jobTypeStateManagement[jobType]; exists {
|
|
return state
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetAllJobTypeStates returns all job type states
|
|
func (d *Dispatcher) GetAllJobTypeStates() map[string]*JobTypeState {
|
|
d.mu.RLock()
|
|
defer d.mu.RUnlock()
|
|
|
|
result := make(map[string]*JobTypeState)
|
|
for jobType, state := range d.jobTypeStateManagement {
|
|
result[jobType] = state
|
|
}
|
|
return result
|
|
}
|
|
|
|
// GetDetectionSchedule returns the schedule for a detection type
|
|
func (d *Dispatcher) GetDetectionSchedule(detectionType string) *DetectionSchedule {
|
|
d.mu.RLock()
|
|
defer d.mu.RUnlock()
|
|
|
|
if schedule, exists := d.detectionSchedules[detectionType]; exists {
|
|
return schedule
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetDueDetections returns all detection types that are due for execution
|
|
func (d *Dispatcher) GetDueDetections() []string {
|
|
d.mu.RLock()
|
|
defer d.mu.RUnlock()
|
|
|
|
var due []string
|
|
now := time.Now()
|
|
|
|
for detectionType, schedule := range d.detectionSchedules {
|
|
if now.After(schedule.NextExecutionTime) {
|
|
due = append(due, detectionType)
|
|
}
|
|
}
|
|
|
|
return due
|
|
}
|
|
|
|
// GetDispatcherStats returns overall dispatcher statistics
|
|
func (d *Dispatcher) GetDispatcherStats() map[string]interface{} {
|
|
d.mu.RLock()
|
|
defer d.mu.RUnlock()
|
|
|
|
totalActive := 0
|
|
totalCompleted := 0
|
|
totalFailed := 0
|
|
|
|
for _, state := range d.jobTypeStateManagement {
|
|
state.mu.RLock()
|
|
totalActive += state.ActiveCount
|
|
totalCompleted += state.CompletedCount
|
|
totalFailed += state.FailedCount
|
|
state.mu.RUnlock()
|
|
}
|
|
|
|
return map[string]interface{}{
|
|
"detection_types_registered": len(d.detectionSchedules),
|
|
"total_active_jobs": totalActive,
|
|
"total_completed_jobs": totalCompleted,
|
|
"total_failed_jobs": totalFailed,
|
|
"job_type_states": len(d.jobTypeStateManagement),
|
|
}
|
|
}
|