Files
seaweedfs/weed/admin/plugin/manager.go

499 lines
13 KiB
Go

package plugin
import (
"context"
"fmt"
"sync"
"time"
)
// Manager is the main component orchestrating the plugin system
type Manager struct {
mu sync.RWMutex
registry *Registry
queue *JobQueue
dispatcher *Dispatcher
configMgr *ConfigManager
grpcServer *GRPCServer
isRunning bool
stopChan chan bool
scheduleTicker *time.Ticker
healthCheckTicker *time.Ticker
detectionTicker *time.Ticker
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
config *ManagerConfig
}
// ManagerConfig holds configuration for the plugin manager
type ManagerConfig struct {
ConfigDir string
ScheduleInterval time.Duration
HealthCheckInterval time.Duration
DetectionInterval time.Duration
MaxQueueSize int
MaxHistorySize int
DeduplicationTTL time.Duration
HealthCheckTimeout time.Duration
FailureDetectionWindow time.Duration
FailureThreshold int
}
// DefaultManagerConfig returns default configuration
func DefaultManagerConfig(configDir string) *ManagerConfig {
return &ManagerConfig{
ConfigDir: configDir,
ScheduleInterval: 5 * time.Second,
HealthCheckInterval: 30 * time.Second,
DetectionInterval: 10 * time.Second,
MaxQueueSize: 10000,
MaxHistorySize: 5000,
DeduplicationTTL: 1 * time.Minute,
HealthCheckTimeout: 90 * time.Second,
FailureDetectionWindow: 5 * time.Minute,
FailureThreshold: 3,
}
}
// NewManager creates a new plugin manager instance
func NewManager(config *ManagerConfig) (*Manager, error) {
if config == nil {
return nil, fmt.Errorf("config is required")
}
// Create configuration manager
configMgr, err := NewConfigManager(config.ConfigDir)
if err != nil {
return nil, fmt.Errorf("failed to create config manager: %w", err)
}
// Create registry
registry := NewRegistry(
config.HealthCheckTimeout,
config.FailureDetectionWindow,
config.FailureThreshold,
)
// Create job queue
queue := NewJobQueue(config.MaxHistorySize, config.DeduplicationTTL)
// Create dispatcher
dispatcher := NewDispatcher(registry, queue)
// Create gRPC server
grpcServer := NewGRPCServer(registry, queue, dispatcher, configMgr)
ctx, cancel := context.WithCancel(context.Background())
manager := &Manager{
registry: registry,
queue: queue,
dispatcher: dispatcher,
configMgr: configMgr,
grpcServer: grpcServer,
stopChan: make(chan bool),
ctx: ctx,
cancel: cancel,
config: config,
}
return manager, nil
}
// Start initializes and starts the plugin manager
func (m *Manager) Start() error {
m.mu.Lock()
if m.isRunning {
m.mu.Unlock()
return fmt.Errorf("manager already running")
}
m.isRunning = true
m.mu.Unlock()
// Load existing configurations
if err := m.configMgr.LoadAllConfigs(); err != nil {
m.isRunning = false
return fmt.Errorf("failed to load configurations: %w", err)
}
// Start background tasks
m.wg.Add(3)
go m.schedulerLoop()
go m.healthCheckLoop()
go m.detectionLoop()
return nil
}
// Stop gracefully stops the plugin manager
func (m *Manager) Stop() error {
m.mu.Lock()
if !m.isRunning {
m.mu.Unlock()
return fmt.Errorf("manager not running")
}
m.isRunning = false
m.mu.Unlock()
// Signal all goroutines to stop
m.cancel()
close(m.stopChan)
// Wait for all goroutines to finish
done := make(chan struct{})
go func() {
m.wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-time.After(30 * time.Second):
return fmt.Errorf("timeout waiting for manager to stop")
}
}
// schedulerLoop periodically schedules detection jobs
func (m *Manager) schedulerLoop() {
defer m.wg.Done()
m.scheduleTicker = time.NewTicker(m.config.ScheduleInterval)
defer m.scheduleTicker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-m.stopChan:
return
case <-m.scheduleTicker.C:
m.performScheduling()
}
}
}
// performScheduling executes the scheduling logic
func (m *Manager) performScheduling() {
scheduledJobs := m.dispatcher.ScheduleDetections()
if len(scheduledJobs) > 0 {
// Jobs have been queued for processing
}
}
// healthCheckLoop periodically checks plugin health
func (m *Manager) healthCheckLoop() {
defer m.wg.Done()
m.healthCheckTicker = time.NewTicker(m.config.HealthCheckInterval)
defer m.healthCheckTicker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-m.stopChan:
return
case <-m.healthCheckTicker.C:
m.performHealthCheck()
}
}
}
// performHealthCheck checks health of all plugins
func (m *Manager) performHealthCheck() {
plugins := m.registry.ListPlugins(true)
for _, plugin := range plugins {
isHealthy, _ := m.registry.HealthCheck(plugin.ID)
if !isHealthy {
// Check if exceeded threshold
if m.registry.HasExceededFailureThreshold(plugin.ID) {
m.registry.UnregisterPlugin(plugin.ID)
}
}
}
}
// detectionLoop periodically triggers detection execution
func (m *Manager) detectionLoop() {
defer m.wg.Done()
m.detectionTicker = time.NewTicker(m.config.DetectionInterval)
defer m.detectionTicker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-m.stopChan:
return
case <-m.detectionTicker.C:
m.processDetectionJobs()
}
}
}
// processDetectionJobs dequeues and dispatches pending jobs
func (m *Manager) processDetectionJobs() {
for {
job := m.queue.Dequeue()
if job == nil {
break
}
// Dispatch job to available plugin
pluginID, err := m.dispatcher.DispatchJob(job)
if err != nil {
// Requeue job if dispatch failed
m.queue.Enqueue(job)
break
}
job.PluginID = pluginID
}
}
// IsRunning returns whether the manager is currently running
func (m *Manager) IsRunning() bool {
m.mu.RLock()
defer m.mu.RUnlock()
return m.isRunning
}
// RegisterDetectionType registers a new detection type
func (m *Manager) RegisterDetectionType(detectionType string, interval time.Duration, maxConcurrent int) error {
return m.dispatcher.RegisterDetectionType(detectionType, interval, maxConcurrent)
}
// UnregisterDetectionType unregisters a detection type
func (m *Manager) UnregisterDetectionType(detectionType string) error {
return m.dispatcher.UnregisterDetectionType(detectionType)
}
// GetStats returns overall statistics
func (m *Manager) GetStats() map[string]interface{} {
return map[string]interface{}{
"registry": m.registry.GetStats(),
"queue": m.queue.GetExecutionStats(),
"dispatcher": m.dispatcher.GetDispatcherStats(),
"running": m.IsRunning(),
}
}
// GetPluginStats returns statistics for a specific plugin
func (m *Manager) GetPluginStats(pluginID string) (map[string]interface{}, error) {
plugin, err := m.registry.GetPlugin(pluginID)
if err != nil {
return nil, err
}
plugin.mu.RLock()
defer plugin.mu.RUnlock()
return map[string]interface{}{
"id": plugin.ID,
"name": plugin.Name,
"version": plugin.Version,
"status": plugin.Status,
"capabilities": plugin.Capabilities,
"active_jobs": plugin.ActiveJobs,
"completed_jobs": plugin.CompletedJobs,
"failed_jobs": plugin.FailedJobs,
"total_detections": plugin.TotalDetections,
"avg_execution_time_ms": plugin.AvgExecutionTimeMs,
"cpu_usage_percent": plugin.CPUUsagePercent,
"memory_usage_bytes": plugin.MemoryUsageBytes,
"connected_at": plugin.ConnectedAt,
"last_heartbeat": plugin.LastHeartbeat,
"uptime_seconds": int(time.Since(plugin.ConnectedAt).Seconds()),
}, nil
}
// ListPlugins returns all registered plugins
func (m *Manager) ListPlugins(includeUnhealthy bool) []*ConnectedPlugin {
return m.registry.ListPlugins(includeUnhealthy)
}
// ListJobs returns job history
func (m *Manager) ListJobs(limit int) []*ExecutionRecord {
return m.queue.GetHistory(limit)
}
// ListJobsForPlugin returns jobs for a specific plugin
func (m *Manager) ListJobsForPlugin(pluginID string, limit int) []*ExecutionRecord {
return m.queue.GetHistoryForPlugin(pluginID, limit)
}
// ListJobsForType returns jobs for a specific type
func (m *Manager) ListJobsForType(jobType string, limit int) []*ExecutionRecord {
return m.queue.GetHistoryForJobType(jobType, limit)
}
// TriggerDetection manually triggers detection for specific types
func (m *Manager) TriggerDetection(detectionTypes []string) ([]string, error) {
var jobIDs []string
for _, detectionType := range detectionTypes {
jobID := fmt.Sprintf("manual-%s-%d", detectionType, time.Now().UnixNano())
job := &Job{
ID: jobID,
Type: detectionType,
State: JobStatePending,
CreatedAt: time.Now(),
}
if err := m.queue.Enqueue(job); err != nil {
continue
}
jobIDs = append(jobIDs, jobID)
}
return jobIDs, nil
}
// GetJobStatus returns the status of a specific job
func (m *Manager) GetJobStatus(jobID string) (*ExecutionRecord, error) {
records := m.queue.GetHistory(10000)
for _, record := range records {
if record.JobID == jobID {
return record, nil
}
}
return nil, fmt.Errorf("job not found: %s", jobID)
}
// CancelJob cancels a pending or scheduled job
func (m *Manager) CancelJob(jobID string) error {
if !m.queue.RemoveJob(jobID) {
return fmt.Errorf("job not found or already completed: %s", jobID)
}
return nil
}
// PurgeHistory removes old job history
func (m *Manager) PurgeHistory(beforeTime time.Time) int {
return m.queue.PurgeOldHistory(beforeTime)
}
// SaveConfig saves plugin configuration
func (m *Manager) SaveConfig(config *PluginConfig, backup bool) error {
return m.configMgr.SaveConfig(config, backup)
}
// LoadConfig loads plugin configuration
func (m *Manager) LoadConfig(pluginID string) (*PluginConfig, error) {
config, err := m.configMgr.LoadConfig(pluginID)
if err != nil {
return nil, fmt.Errorf("failed to load config: %w", err)
}
return config, nil
}
// ListConfigs returns all loaded configurations
func (m *Manager) ListConfigs() map[string]*PluginConfig {
return m.configMgr.ListConfigs()
}
// DeleteConfig deletes a configuration
func (m *Manager) DeleteConfig(pluginID string) error {
return m.configMgr.DeleteConfig(pluginID)
}
// GetRegistry returns the plugin registry
func (m *Manager) GetRegistry() *Registry {
return m.registry
}
// GetQueue returns the job queue
func (m *Manager) GetQueue() *JobQueue {
return m.queue
}
// GetDispatcher returns the dispatcher
func (m *Manager) GetDispatcher() *Dispatcher {
return m.dispatcher
}
// GetGRPCServer returns the gRPC server
func (m *Manager) GetGRPCServer() *GRPCServer {
return m.grpcServer
}
// GetDetectionHistory returns detection history for a job type
func (m *Manager) GetDetectionHistory(jobType string) []DetectionRecord {
m.mu.RLock()
defer m.mu.RUnlock()
configs := m.configMgr.ListConfigs()
for _, cfg := range configs {
if jobCfg, ok := cfg.GetJobTypeConfig(jobType); ok {
cfg.mu.RLock()
defer cfg.mu.RUnlock()
history := make([]DetectionRecord, len(jobCfg.DetectionHistory))
copy(history, jobCfg.DetectionHistory)
return history
}
}
return []DetectionRecord{}
}
// GetExecutionHistory returns execution history for a job type
func (m *Manager) GetExecutionHistory(jobType string) []ExecutionRecord {
m.mu.RLock()
defer m.mu.RUnlock()
configs := m.configMgr.ListConfigs()
for _, cfg := range configs {
if jobCfg, ok := cfg.GetJobTypeConfig(jobType); ok {
cfg.mu.RLock()
defer cfg.mu.RUnlock()
history := make([]ExecutionRecord, len(jobCfg.ExecutionHistory))
copy(history, jobCfg.ExecutionHistory)
return history
}
}
return []ExecutionRecord{}
}
// RecordDetection adds a detection record to history
func (m *Manager) RecordDetection(jobType string, record *DetectionRecord) {
m.mu.RLock()
defer m.mu.RUnlock()
configs := m.configMgr.ListConfigs()
for _, cfg := range configs {
if jobCfg, ok := cfg.GetJobTypeConfig(jobType); ok {
cfg.mu.Lock()
maxSize := 50
jobCfg.DetectionHistory = append([]DetectionRecord{*record}, jobCfg.DetectionHistory...)
if len(jobCfg.DetectionHistory) > maxSize {
jobCfg.DetectionHistory = jobCfg.DetectionHistory[:maxSize]
}
cfg.mu.Unlock()
break
}
}
}
// RecordExecution adds an execution record to history
func (m *Manager) RecordExecution(jobType string, record *ExecutionRecord) {
m.mu.RLock()
defer m.mu.RUnlock()
configs := m.configMgr.ListConfigs()
for _, cfg := range configs {
if jobCfg, ok := cfg.GetJobTypeConfig(jobType); ok {
cfg.mu.Lock()
maxSize := 100
jobCfg.ExecutionHistory = append([]ExecutionRecord{*record}, jobCfg.ExecutionHistory...)
if len(jobCfg.ExecutionHistory) > maxSize {
jobCfg.ExecutionHistory = jobCfg.ExecutionHistory[:maxSize]
}
cfg.mu.Unlock()
break
}
}
}