mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-16 23:01:30 +00:00
499 lines
13 KiB
Go
499 lines
13 KiB
Go
package plugin
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Manager is the main component orchestrating the plugin system
|
|
type Manager struct {
|
|
mu sync.RWMutex
|
|
registry *Registry
|
|
queue *JobQueue
|
|
dispatcher *Dispatcher
|
|
configMgr *ConfigManager
|
|
grpcServer *GRPCServer
|
|
isRunning bool
|
|
stopChan chan bool
|
|
scheduleTicker *time.Ticker
|
|
healthCheckTicker *time.Ticker
|
|
detectionTicker *time.Ticker
|
|
wg sync.WaitGroup
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
config *ManagerConfig
|
|
}
|
|
|
|
// ManagerConfig holds configuration for the plugin manager
|
|
type ManagerConfig struct {
|
|
ConfigDir string
|
|
ScheduleInterval time.Duration
|
|
HealthCheckInterval time.Duration
|
|
DetectionInterval time.Duration
|
|
MaxQueueSize int
|
|
MaxHistorySize int
|
|
DeduplicationTTL time.Duration
|
|
HealthCheckTimeout time.Duration
|
|
FailureDetectionWindow time.Duration
|
|
FailureThreshold int
|
|
}
|
|
|
|
// DefaultManagerConfig returns default configuration
|
|
func DefaultManagerConfig(configDir string) *ManagerConfig {
|
|
return &ManagerConfig{
|
|
ConfigDir: configDir,
|
|
ScheduleInterval: 5 * time.Second,
|
|
HealthCheckInterval: 30 * time.Second,
|
|
DetectionInterval: 10 * time.Second,
|
|
MaxQueueSize: 10000,
|
|
MaxHistorySize: 5000,
|
|
DeduplicationTTL: 1 * time.Minute,
|
|
HealthCheckTimeout: 90 * time.Second,
|
|
FailureDetectionWindow: 5 * time.Minute,
|
|
FailureThreshold: 3,
|
|
}
|
|
}
|
|
|
|
// NewManager creates a new plugin manager instance
|
|
func NewManager(config *ManagerConfig) (*Manager, error) {
|
|
if config == nil {
|
|
return nil, fmt.Errorf("config is required")
|
|
}
|
|
|
|
// Create configuration manager
|
|
configMgr, err := NewConfigManager(config.ConfigDir)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create config manager: %w", err)
|
|
}
|
|
|
|
// Create registry
|
|
registry := NewRegistry(
|
|
config.HealthCheckTimeout,
|
|
config.FailureDetectionWindow,
|
|
config.FailureThreshold,
|
|
)
|
|
|
|
// Create job queue
|
|
queue := NewJobQueue(config.MaxHistorySize, config.DeduplicationTTL)
|
|
|
|
// Create dispatcher
|
|
dispatcher := NewDispatcher(registry, queue)
|
|
|
|
// Create gRPC server
|
|
grpcServer := NewGRPCServer(registry, queue, dispatcher, configMgr)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
manager := &Manager{
|
|
registry: registry,
|
|
queue: queue,
|
|
dispatcher: dispatcher,
|
|
configMgr: configMgr,
|
|
grpcServer: grpcServer,
|
|
stopChan: make(chan bool),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
config: config,
|
|
}
|
|
|
|
return manager, nil
|
|
}
|
|
|
|
// Start initializes and starts the plugin manager
|
|
func (m *Manager) Start() error {
|
|
m.mu.Lock()
|
|
if m.isRunning {
|
|
m.mu.Unlock()
|
|
return fmt.Errorf("manager already running")
|
|
}
|
|
m.isRunning = true
|
|
m.mu.Unlock()
|
|
|
|
// Load existing configurations
|
|
if err := m.configMgr.LoadAllConfigs(); err != nil {
|
|
m.isRunning = false
|
|
return fmt.Errorf("failed to load configurations: %w", err)
|
|
}
|
|
|
|
// Start background tasks
|
|
m.wg.Add(3)
|
|
go m.schedulerLoop()
|
|
go m.healthCheckLoop()
|
|
go m.detectionLoop()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop gracefully stops the plugin manager
|
|
func (m *Manager) Stop() error {
|
|
m.mu.Lock()
|
|
if !m.isRunning {
|
|
m.mu.Unlock()
|
|
return fmt.Errorf("manager not running")
|
|
}
|
|
m.isRunning = false
|
|
m.mu.Unlock()
|
|
|
|
// Signal all goroutines to stop
|
|
m.cancel()
|
|
close(m.stopChan)
|
|
|
|
// Wait for all goroutines to finish
|
|
done := make(chan struct{})
|
|
go func() {
|
|
m.wg.Wait()
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
return nil
|
|
case <-time.After(30 * time.Second):
|
|
return fmt.Errorf("timeout waiting for manager to stop")
|
|
}
|
|
}
|
|
|
|
// schedulerLoop periodically schedules detection jobs
|
|
func (m *Manager) schedulerLoop() {
|
|
defer m.wg.Done()
|
|
|
|
m.scheduleTicker = time.NewTicker(m.config.ScheduleInterval)
|
|
defer m.scheduleTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-m.ctx.Done():
|
|
return
|
|
case <-m.stopChan:
|
|
return
|
|
case <-m.scheduleTicker.C:
|
|
m.performScheduling()
|
|
}
|
|
}
|
|
}
|
|
|
|
// performScheduling executes the scheduling logic
|
|
func (m *Manager) performScheduling() {
|
|
scheduledJobs := m.dispatcher.ScheduleDetections()
|
|
if len(scheduledJobs) > 0 {
|
|
// Jobs have been queued for processing
|
|
}
|
|
}
|
|
|
|
// healthCheckLoop periodically checks plugin health
|
|
func (m *Manager) healthCheckLoop() {
|
|
defer m.wg.Done()
|
|
|
|
m.healthCheckTicker = time.NewTicker(m.config.HealthCheckInterval)
|
|
defer m.healthCheckTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-m.ctx.Done():
|
|
return
|
|
case <-m.stopChan:
|
|
return
|
|
case <-m.healthCheckTicker.C:
|
|
m.performHealthCheck()
|
|
}
|
|
}
|
|
}
|
|
|
|
// performHealthCheck checks health of all plugins
|
|
func (m *Manager) performHealthCheck() {
|
|
plugins := m.registry.ListPlugins(true)
|
|
|
|
for _, plugin := range plugins {
|
|
isHealthy, _ := m.registry.HealthCheck(plugin.ID)
|
|
if !isHealthy {
|
|
// Check if exceeded threshold
|
|
if m.registry.HasExceededFailureThreshold(plugin.ID) {
|
|
m.registry.UnregisterPlugin(plugin.ID)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// detectionLoop periodically triggers detection execution
|
|
func (m *Manager) detectionLoop() {
|
|
defer m.wg.Done()
|
|
|
|
m.detectionTicker = time.NewTicker(m.config.DetectionInterval)
|
|
defer m.detectionTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-m.ctx.Done():
|
|
return
|
|
case <-m.stopChan:
|
|
return
|
|
case <-m.detectionTicker.C:
|
|
m.processDetectionJobs()
|
|
}
|
|
}
|
|
}
|
|
|
|
// processDetectionJobs dequeues and dispatches pending jobs
|
|
func (m *Manager) processDetectionJobs() {
|
|
for {
|
|
job := m.queue.Dequeue()
|
|
if job == nil {
|
|
break
|
|
}
|
|
|
|
// Dispatch job to available plugin
|
|
pluginID, err := m.dispatcher.DispatchJob(job)
|
|
if err != nil {
|
|
// Requeue job if dispatch failed
|
|
m.queue.Enqueue(job)
|
|
break
|
|
}
|
|
|
|
job.PluginID = pluginID
|
|
}
|
|
}
|
|
|
|
// IsRunning returns whether the manager is currently running
|
|
func (m *Manager) IsRunning() bool {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
return m.isRunning
|
|
}
|
|
|
|
// RegisterDetectionType registers a new detection type
|
|
func (m *Manager) RegisterDetectionType(detectionType string, interval time.Duration, maxConcurrent int) error {
|
|
return m.dispatcher.RegisterDetectionType(detectionType, interval, maxConcurrent)
|
|
}
|
|
|
|
// UnregisterDetectionType unregisters a detection type
|
|
func (m *Manager) UnregisterDetectionType(detectionType string) error {
|
|
return m.dispatcher.UnregisterDetectionType(detectionType)
|
|
}
|
|
|
|
// GetStats returns overall statistics
|
|
func (m *Manager) GetStats() map[string]interface{} {
|
|
return map[string]interface{}{
|
|
"registry": m.registry.GetStats(),
|
|
"queue": m.queue.GetExecutionStats(),
|
|
"dispatcher": m.dispatcher.GetDispatcherStats(),
|
|
"running": m.IsRunning(),
|
|
}
|
|
}
|
|
|
|
// GetPluginStats returns statistics for a specific plugin
|
|
func (m *Manager) GetPluginStats(pluginID string) (map[string]interface{}, error) {
|
|
plugin, err := m.registry.GetPlugin(pluginID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
plugin.mu.RLock()
|
|
defer plugin.mu.RUnlock()
|
|
|
|
return map[string]interface{}{
|
|
"id": plugin.ID,
|
|
"name": plugin.Name,
|
|
"version": plugin.Version,
|
|
"status": plugin.Status,
|
|
"capabilities": plugin.Capabilities,
|
|
"active_jobs": plugin.ActiveJobs,
|
|
"completed_jobs": plugin.CompletedJobs,
|
|
"failed_jobs": plugin.FailedJobs,
|
|
"total_detections": plugin.TotalDetections,
|
|
"avg_execution_time_ms": plugin.AvgExecutionTimeMs,
|
|
"cpu_usage_percent": plugin.CPUUsagePercent,
|
|
"memory_usage_bytes": plugin.MemoryUsageBytes,
|
|
"connected_at": plugin.ConnectedAt,
|
|
"last_heartbeat": plugin.LastHeartbeat,
|
|
"uptime_seconds": int(time.Since(plugin.ConnectedAt).Seconds()),
|
|
}, nil
|
|
}
|
|
|
|
// ListPlugins returns all registered plugins
|
|
func (m *Manager) ListPlugins(includeUnhealthy bool) []*ConnectedPlugin {
|
|
return m.registry.ListPlugins(includeUnhealthy)
|
|
}
|
|
|
|
// ListJobs returns job history
|
|
func (m *Manager) ListJobs(limit int) []*ExecutionRecord {
|
|
return m.queue.GetHistory(limit)
|
|
}
|
|
|
|
// ListJobsForPlugin returns jobs for a specific plugin
|
|
func (m *Manager) ListJobsForPlugin(pluginID string, limit int) []*ExecutionRecord {
|
|
return m.queue.GetHistoryForPlugin(pluginID, limit)
|
|
}
|
|
|
|
// ListJobsForType returns jobs for a specific type
|
|
func (m *Manager) ListJobsForType(jobType string, limit int) []*ExecutionRecord {
|
|
return m.queue.GetHistoryForJobType(jobType, limit)
|
|
}
|
|
|
|
// TriggerDetection manually triggers detection for specific types
|
|
func (m *Manager) TriggerDetection(detectionTypes []string) ([]string, error) {
|
|
var jobIDs []string
|
|
|
|
for _, detectionType := range detectionTypes {
|
|
jobID := fmt.Sprintf("manual-%s-%d", detectionType, time.Now().UnixNano())
|
|
job := &Job{
|
|
ID: jobID,
|
|
Type: detectionType,
|
|
State: JobStatePending,
|
|
CreatedAt: time.Now(),
|
|
}
|
|
|
|
if err := m.queue.Enqueue(job); err != nil {
|
|
continue
|
|
}
|
|
jobIDs = append(jobIDs, jobID)
|
|
}
|
|
|
|
return jobIDs, nil
|
|
}
|
|
|
|
// GetJobStatus returns the status of a specific job
|
|
func (m *Manager) GetJobStatus(jobID string) (*ExecutionRecord, error) {
|
|
records := m.queue.GetHistory(10000)
|
|
for _, record := range records {
|
|
if record.JobID == jobID {
|
|
return record, nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("job not found: %s", jobID)
|
|
}
|
|
|
|
// CancelJob cancels a pending or scheduled job
|
|
func (m *Manager) CancelJob(jobID string) error {
|
|
if !m.queue.RemoveJob(jobID) {
|
|
return fmt.Errorf("job not found or already completed: %s", jobID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// PurgeHistory removes old job history
|
|
func (m *Manager) PurgeHistory(beforeTime time.Time) int {
|
|
return m.queue.PurgeOldHistory(beforeTime)
|
|
}
|
|
|
|
// SaveConfig saves plugin configuration
|
|
func (m *Manager) SaveConfig(config *PluginConfig, backup bool) error {
|
|
return m.configMgr.SaveConfig(config, backup)
|
|
}
|
|
|
|
// LoadConfig loads plugin configuration
|
|
func (m *Manager) LoadConfig(pluginID string) (*PluginConfig, error) {
|
|
config, err := m.configMgr.LoadConfig(pluginID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load config: %w", err)
|
|
}
|
|
return config, nil
|
|
}
|
|
|
|
// ListConfigs returns all loaded configurations
|
|
func (m *Manager) ListConfigs() map[string]*PluginConfig {
|
|
return m.configMgr.ListConfigs()
|
|
}
|
|
|
|
// DeleteConfig deletes a configuration
|
|
func (m *Manager) DeleteConfig(pluginID string) error {
|
|
return m.configMgr.DeleteConfig(pluginID)
|
|
}
|
|
|
|
// GetRegistry returns the plugin registry
|
|
func (m *Manager) GetRegistry() *Registry {
|
|
return m.registry
|
|
}
|
|
|
|
// GetQueue returns the job queue
|
|
func (m *Manager) GetQueue() *JobQueue {
|
|
return m.queue
|
|
}
|
|
|
|
// GetDispatcher returns the dispatcher
|
|
func (m *Manager) GetDispatcher() *Dispatcher {
|
|
return m.dispatcher
|
|
}
|
|
|
|
// GetGRPCServer returns the gRPC server
|
|
func (m *Manager) GetGRPCServer() *GRPCServer {
|
|
return m.grpcServer
|
|
}
|
|
|
|
// GetDetectionHistory returns detection history for a job type
|
|
func (m *Manager) GetDetectionHistory(jobType string) []DetectionRecord {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
configs := m.configMgr.ListConfigs()
|
|
for _, cfg := range configs {
|
|
if jobCfg, ok := cfg.GetJobTypeConfig(jobType); ok {
|
|
cfg.mu.RLock()
|
|
defer cfg.mu.RUnlock()
|
|
history := make([]DetectionRecord, len(jobCfg.DetectionHistory))
|
|
copy(history, jobCfg.DetectionHistory)
|
|
return history
|
|
}
|
|
}
|
|
return []DetectionRecord{}
|
|
}
|
|
|
|
// GetExecutionHistory returns execution history for a job type
|
|
func (m *Manager) GetExecutionHistory(jobType string) []ExecutionRecord {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
configs := m.configMgr.ListConfigs()
|
|
for _, cfg := range configs {
|
|
if jobCfg, ok := cfg.GetJobTypeConfig(jobType); ok {
|
|
cfg.mu.RLock()
|
|
defer cfg.mu.RUnlock()
|
|
history := make([]ExecutionRecord, len(jobCfg.ExecutionHistory))
|
|
copy(history, jobCfg.ExecutionHistory)
|
|
return history
|
|
}
|
|
}
|
|
return []ExecutionRecord{}
|
|
}
|
|
|
|
// RecordDetection adds a detection record to history
|
|
func (m *Manager) RecordDetection(jobType string, record *DetectionRecord) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
configs := m.configMgr.ListConfigs()
|
|
for _, cfg := range configs {
|
|
if jobCfg, ok := cfg.GetJobTypeConfig(jobType); ok {
|
|
cfg.mu.Lock()
|
|
maxSize := 50
|
|
jobCfg.DetectionHistory = append([]DetectionRecord{*record}, jobCfg.DetectionHistory...)
|
|
if len(jobCfg.DetectionHistory) > maxSize {
|
|
jobCfg.DetectionHistory = jobCfg.DetectionHistory[:maxSize]
|
|
}
|
|
cfg.mu.Unlock()
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// RecordExecution adds an execution record to history
|
|
func (m *Manager) RecordExecution(jobType string, record *ExecutionRecord) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
configs := m.configMgr.ListConfigs()
|
|
for _, cfg := range configs {
|
|
if jobCfg, ok := cfg.GetJobTypeConfig(jobType); ok {
|
|
cfg.mu.Lock()
|
|
maxSize := 100
|
|
jobCfg.ExecutionHistory = append([]ExecutionRecord{*record}, jobCfg.ExecutionHistory...)
|
|
if len(jobCfg.ExecutionHistory) > maxSize {
|
|
jobCfg.ExecutionHistory = jobCfg.ExecutionHistory[:maxSize]
|
|
}
|
|
cfg.mu.Unlock()
|
|
break
|
|
}
|
|
}
|
|
}
|