mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-16 23:01:30 +00:00
312 lines
7.5 KiB
Go
312 lines
7.5 KiB
Go
package plugin
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Registry manages plugin registration and lifecycle
|
|
type Registry struct {
|
|
mu sync.RWMutex
|
|
plugins map[string]*ConnectedPlugin
|
|
capabilityIndex map[string][]string // Maps capability to plugin IDs
|
|
healthCheckTimeout time.Duration
|
|
failureDetectionWindow time.Duration
|
|
failureThreshold int
|
|
pluginFailureCount map[string]int
|
|
}
|
|
|
|
// NewRegistry creates a new plugin registry
|
|
func NewRegistry(healthCheckTimeout, failureDetectionWindow time.Duration, failureThreshold int) *Registry {
|
|
return &Registry{
|
|
plugins: make(map[string]*ConnectedPlugin),
|
|
capabilityIndex: make(map[string][]string),
|
|
healthCheckTimeout: healthCheckTimeout,
|
|
failureDetectionWindow: failureDetectionWindow,
|
|
failureThreshold: failureThreshold,
|
|
pluginFailureCount: make(map[string]int),
|
|
}
|
|
}
|
|
|
|
// RegisterPlugin adds a plugin to the registry
|
|
func (r *Registry) RegisterPlugin(plugin *ConnectedPlugin) error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
if _, exists := r.plugins[plugin.ID]; exists {
|
|
return fmt.Errorf("plugin %s already registered", plugin.ID)
|
|
}
|
|
|
|
r.plugins[plugin.ID] = plugin
|
|
r.pluginFailureCount[plugin.ID] = 0
|
|
|
|
// Build capability index
|
|
for _, cap := range plugin.Capabilities {
|
|
r.capabilityIndex[cap] = append(r.capabilityIndex[cap], plugin.ID)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UnregisterPlugin removes a plugin from the registry
|
|
func (r *Registry) UnregisterPlugin(pluginID string) error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
plugin, exists := r.plugins[pluginID]
|
|
if !exists {
|
|
return fmt.Errorf("plugin %s not found", pluginID)
|
|
}
|
|
|
|
// Remove from capability index
|
|
for _, cap := range plugin.Capabilities {
|
|
for i, id := range r.capabilityIndex[cap] {
|
|
if id == pluginID {
|
|
r.capabilityIndex[cap] = append(r.capabilityIndex[cap][:i], r.capabilityIndex[cap][i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
delete(r.plugins, pluginID)
|
|
delete(r.pluginFailureCount, pluginID)
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetPlugin retrieves a plugin by ID
|
|
func (r *Registry) GetPlugin(pluginID string) (*ConnectedPlugin, error) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
plugin, exists := r.plugins[pluginID]
|
|
if !exists {
|
|
return nil, fmt.Errorf("plugin %s not found", pluginID)
|
|
}
|
|
|
|
return plugin, nil
|
|
}
|
|
|
|
// GetPluginsByCapability returns all plugins with a specific capability
|
|
func (r *Registry) GetPluginsByCapability(capability string) []*ConnectedPlugin {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
pluginIDs, exists := r.capabilityIndex[capability]
|
|
if !exists {
|
|
return []*ConnectedPlugin{}
|
|
}
|
|
|
|
var result []*ConnectedPlugin
|
|
for _, id := range pluginIDs {
|
|
if plugin, ok := r.plugins[id]; ok {
|
|
result = append(result, plugin)
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// ListPlugins returns all registered plugins
|
|
func (r *Registry) ListPlugins(includeUnhealthy bool) []*ConnectedPlugin {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
var result []*ConnectedPlugin
|
|
|
|
for _, plugin := range r.plugins {
|
|
if !includeUnhealthy && time.Since(plugin.LastHeartbeat) > r.healthCheckTimeout {
|
|
continue
|
|
}
|
|
result = append(result, plugin)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// HealthCheck verifies plugin health based on heartbeat status
|
|
func (r *Registry) HealthCheck(pluginID string) (bool, error) {
|
|
r.mu.RLock()
|
|
plugin, exists := r.plugins[pluginID]
|
|
r.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return false, fmt.Errorf("plugin %s not found", pluginID)
|
|
}
|
|
|
|
isHealthy := plugin.IsHealthy(r.healthCheckTimeout)
|
|
|
|
if !isHealthy {
|
|
r.mu.Lock()
|
|
r.pluginFailureCount[pluginID]++
|
|
r.mu.Unlock()
|
|
} else {
|
|
r.mu.Lock()
|
|
r.pluginFailureCount[pluginID] = 0
|
|
r.mu.Unlock()
|
|
}
|
|
|
|
return isHealthy, nil
|
|
}
|
|
|
|
// GetFailureCount returns the current failure count for a plugin
|
|
func (r *Registry) GetFailureCount(pluginID string) int {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
return r.pluginFailureCount[pluginID]
|
|
}
|
|
|
|
// HasExceededFailureThreshold checks if a plugin has exceeded the failure threshold
|
|
func (r *Registry) HasExceededFailureThreshold(pluginID string) bool {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
return r.pluginFailureCount[pluginID] > r.failureThreshold
|
|
}
|
|
|
|
// ResetFailureCount resets the failure counter for a plugin
|
|
func (r *Registry) ResetFailureCount(pluginID string) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
r.pluginFailureCount[pluginID] = 0
|
|
}
|
|
|
|
// Count returns the total number of registered plugins
|
|
func (r *Registry) Count() int {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
return len(r.plugins)
|
|
}
|
|
|
|
// CountHealthy returns the number of healthy plugins
|
|
func (r *Registry) CountHealthy() int {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
count := 0
|
|
for _, plugin := range r.plugins {
|
|
if plugin.IsHealthy(r.healthCheckTimeout) {
|
|
count++
|
|
}
|
|
}
|
|
return count
|
|
}
|
|
|
|
// GetCapabilities returns all registered capabilities
|
|
func (r *Registry) GetCapabilities() []string {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
var capabilities []string
|
|
for cap := range r.capabilityIndex {
|
|
capabilities = append(capabilities, cap)
|
|
}
|
|
return capabilities
|
|
}
|
|
|
|
// UpdateHeartbeat updates the heartbeat timestamp for a plugin
|
|
func (r *Registry) UpdateHeartbeat(pluginID string) error {
|
|
r.mu.RLock()
|
|
plugin, exists := r.plugins[pluginID]
|
|
r.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return fmt.Errorf("plugin %s not found", pluginID)
|
|
}
|
|
|
|
plugin.UpdateHeartbeat()
|
|
r.ResetFailureCount(pluginID)
|
|
return nil
|
|
}
|
|
|
|
// GetUnhealthyPlugins returns plugins that have failed health checks
|
|
func (r *Registry) GetUnhealthyPlugins() []*ConnectedPlugin {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
var unhealthy []*ConnectedPlugin
|
|
for _, plugin := range r.plugins {
|
|
if !plugin.IsHealthy(r.healthCheckTimeout) {
|
|
unhealthy = append(unhealthy, plugin)
|
|
}
|
|
}
|
|
return unhealthy
|
|
}
|
|
|
|
// RemoveUnhealthyPlugins removes plugins that have exceeded the failure threshold
|
|
func (r *Registry) RemoveUnhealthyPlugins() []string {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
var removed []string
|
|
for pluginID, failureCount := range r.pluginFailureCount {
|
|
if failureCount > r.failureThreshold {
|
|
if plugin, exists := r.plugins[pluginID]; exists {
|
|
// Remove from capability index
|
|
for _, cap := range plugin.Capabilities {
|
|
for i, id := range r.capabilityIndex[cap] {
|
|
if id == pluginID {
|
|
r.capabilityIndex[cap] = append(r.capabilityIndex[cap][:i], r.capabilityIndex[cap][i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
delete(r.plugins, pluginID)
|
|
delete(r.pluginFailureCount, pluginID)
|
|
removed = append(removed, pluginID)
|
|
}
|
|
}
|
|
}
|
|
return removed
|
|
}
|
|
|
|
// UpdatePluginStatus updates the status field of a plugin
|
|
func (r *Registry) UpdatePluginStatus(pluginID, status string) error {
|
|
r.mu.RLock()
|
|
plugin, exists := r.plugins[pluginID]
|
|
r.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return fmt.Errorf("plugin %s not found", pluginID)
|
|
}
|
|
|
|
plugin.mu.Lock()
|
|
plugin.Status = status
|
|
plugin.mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetStats returns statistics for all plugins
|
|
func (r *Registry) GetStats() map[string]interface{} {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
totalPlugins := len(r.plugins)
|
|
healthyPlugins := 0
|
|
totalActiveJobs := 0
|
|
totalCompletedJobs := 0
|
|
totalFailedJobs := 0
|
|
|
|
for _, plugin := range r.plugins {
|
|
if plugin.IsHealthy(r.healthCheckTimeout) {
|
|
healthyPlugins++
|
|
}
|
|
plugin.mu.RLock()
|
|
totalActiveJobs += plugin.ActiveJobs
|
|
totalCompletedJobs += plugin.CompletedJobs
|
|
totalFailedJobs += plugin.FailedJobs
|
|
plugin.mu.RUnlock()
|
|
}
|
|
|
|
return map[string]interface{}{
|
|
"total_plugins": totalPlugins,
|
|
"healthy_plugins": healthyPlugins,
|
|
"unhealthy_plugins": totalPlugins - healthyPlugins,
|
|
"total_active_jobs": totalActiveJobs,
|
|
"total_completed": totalCompletedJobs,
|
|
"total_failed": totalFailedJobs,
|
|
"capabilities": len(r.capabilityIndex),
|
|
}
|
|
}
|