Files
seaweedfs/weed/admin/plugin/registry.go

312 lines
7.5 KiB
Go

package plugin
import (
"fmt"
"sync"
"time"
)
// Registry manages plugin registration and lifecycle
type Registry struct {
mu sync.RWMutex
plugins map[string]*ConnectedPlugin
capabilityIndex map[string][]string // Maps capability to plugin IDs
healthCheckTimeout time.Duration
failureDetectionWindow time.Duration
failureThreshold int
pluginFailureCount map[string]int
}
// NewRegistry creates a new plugin registry
func NewRegistry(healthCheckTimeout, failureDetectionWindow time.Duration, failureThreshold int) *Registry {
return &Registry{
plugins: make(map[string]*ConnectedPlugin),
capabilityIndex: make(map[string][]string),
healthCheckTimeout: healthCheckTimeout,
failureDetectionWindow: failureDetectionWindow,
failureThreshold: failureThreshold,
pluginFailureCount: make(map[string]int),
}
}
// RegisterPlugin adds a plugin to the registry
func (r *Registry) RegisterPlugin(plugin *ConnectedPlugin) error {
r.mu.Lock()
defer r.mu.Unlock()
if _, exists := r.plugins[plugin.ID]; exists {
return fmt.Errorf("plugin %s already registered", plugin.ID)
}
r.plugins[plugin.ID] = plugin
r.pluginFailureCount[plugin.ID] = 0
// Build capability index
for _, cap := range plugin.Capabilities {
r.capabilityIndex[cap] = append(r.capabilityIndex[cap], plugin.ID)
}
return nil
}
// UnregisterPlugin removes a plugin from the registry
func (r *Registry) UnregisterPlugin(pluginID string) error {
r.mu.Lock()
defer r.mu.Unlock()
plugin, exists := r.plugins[pluginID]
if !exists {
return fmt.Errorf("plugin %s not found", pluginID)
}
// Remove from capability index
for _, cap := range plugin.Capabilities {
for i, id := range r.capabilityIndex[cap] {
if id == pluginID {
r.capabilityIndex[cap] = append(r.capabilityIndex[cap][:i], r.capabilityIndex[cap][i+1:]...)
break
}
}
}
delete(r.plugins, pluginID)
delete(r.pluginFailureCount, pluginID)
return nil
}
// GetPlugin retrieves a plugin by ID
func (r *Registry) GetPlugin(pluginID string) (*ConnectedPlugin, error) {
r.mu.RLock()
defer r.mu.RUnlock()
plugin, exists := r.plugins[pluginID]
if !exists {
return nil, fmt.Errorf("plugin %s not found", pluginID)
}
return plugin, nil
}
// GetPluginsByCapability returns all plugins with a specific capability
func (r *Registry) GetPluginsByCapability(capability string) []*ConnectedPlugin {
r.mu.RLock()
defer r.mu.RUnlock()
pluginIDs, exists := r.capabilityIndex[capability]
if !exists {
return []*ConnectedPlugin{}
}
var result []*ConnectedPlugin
for _, id := range pluginIDs {
if plugin, ok := r.plugins[id]; ok {
result = append(result, plugin)
}
}
return result
}
// ListPlugins returns all registered plugins
func (r *Registry) ListPlugins(includeUnhealthy bool) []*ConnectedPlugin {
r.mu.RLock()
defer r.mu.RUnlock()
var result []*ConnectedPlugin
for _, plugin := range r.plugins {
if !includeUnhealthy && time.Since(plugin.LastHeartbeat) > r.healthCheckTimeout {
continue
}
result = append(result, plugin)
}
return result
}
// HealthCheck verifies plugin health based on heartbeat status
func (r *Registry) HealthCheck(pluginID string) (bool, error) {
r.mu.RLock()
plugin, exists := r.plugins[pluginID]
r.mu.RUnlock()
if !exists {
return false, fmt.Errorf("plugin %s not found", pluginID)
}
isHealthy := plugin.IsHealthy(r.healthCheckTimeout)
if !isHealthy {
r.mu.Lock()
r.pluginFailureCount[pluginID]++
r.mu.Unlock()
} else {
r.mu.Lock()
r.pluginFailureCount[pluginID] = 0
r.mu.Unlock()
}
return isHealthy, nil
}
// GetFailureCount returns the current failure count for a plugin
func (r *Registry) GetFailureCount(pluginID string) int {
r.mu.RLock()
defer r.mu.RUnlock()
return r.pluginFailureCount[pluginID]
}
// HasExceededFailureThreshold checks if a plugin has exceeded the failure threshold
func (r *Registry) HasExceededFailureThreshold(pluginID string) bool {
r.mu.RLock()
defer r.mu.RUnlock()
return r.pluginFailureCount[pluginID] > r.failureThreshold
}
// ResetFailureCount resets the failure counter for a plugin
func (r *Registry) ResetFailureCount(pluginID string) {
r.mu.Lock()
defer r.mu.Unlock()
r.pluginFailureCount[pluginID] = 0
}
// Count returns the total number of registered plugins
func (r *Registry) Count() int {
r.mu.RLock()
defer r.mu.RUnlock()
return len(r.plugins)
}
// CountHealthy returns the number of healthy plugins
func (r *Registry) CountHealthy() int {
r.mu.RLock()
defer r.mu.RUnlock()
count := 0
for _, plugin := range r.plugins {
if plugin.IsHealthy(r.healthCheckTimeout) {
count++
}
}
return count
}
// GetCapabilities returns all registered capabilities
func (r *Registry) GetCapabilities() []string {
r.mu.RLock()
defer r.mu.RUnlock()
var capabilities []string
for cap := range r.capabilityIndex {
capabilities = append(capabilities, cap)
}
return capabilities
}
// UpdateHeartbeat updates the heartbeat timestamp for a plugin
func (r *Registry) UpdateHeartbeat(pluginID string) error {
r.mu.RLock()
plugin, exists := r.plugins[pluginID]
r.mu.RUnlock()
if !exists {
return fmt.Errorf("plugin %s not found", pluginID)
}
plugin.UpdateHeartbeat()
r.ResetFailureCount(pluginID)
return nil
}
// GetUnhealthyPlugins returns plugins that have failed health checks
func (r *Registry) GetUnhealthyPlugins() []*ConnectedPlugin {
r.mu.RLock()
defer r.mu.RUnlock()
var unhealthy []*ConnectedPlugin
for _, plugin := range r.plugins {
if !plugin.IsHealthy(r.healthCheckTimeout) {
unhealthy = append(unhealthy, plugin)
}
}
return unhealthy
}
// RemoveUnhealthyPlugins removes plugins that have exceeded the failure threshold
func (r *Registry) RemoveUnhealthyPlugins() []string {
r.mu.Lock()
defer r.mu.Unlock()
var removed []string
for pluginID, failureCount := range r.pluginFailureCount {
if failureCount > r.failureThreshold {
if plugin, exists := r.plugins[pluginID]; exists {
// Remove from capability index
for _, cap := range plugin.Capabilities {
for i, id := range r.capabilityIndex[cap] {
if id == pluginID {
r.capabilityIndex[cap] = append(r.capabilityIndex[cap][:i], r.capabilityIndex[cap][i+1:]...)
break
}
}
}
delete(r.plugins, pluginID)
delete(r.pluginFailureCount, pluginID)
removed = append(removed, pluginID)
}
}
}
return removed
}
// UpdatePluginStatus updates the status field of a plugin
func (r *Registry) UpdatePluginStatus(pluginID, status string) error {
r.mu.RLock()
plugin, exists := r.plugins[pluginID]
r.mu.RUnlock()
if !exists {
return fmt.Errorf("plugin %s not found", pluginID)
}
plugin.mu.Lock()
plugin.Status = status
plugin.mu.Unlock()
return nil
}
// GetStats returns statistics for all plugins
func (r *Registry) GetStats() map[string]interface{} {
r.mu.RLock()
defer r.mu.RUnlock()
totalPlugins := len(r.plugins)
healthyPlugins := 0
totalActiveJobs := 0
totalCompletedJobs := 0
totalFailedJobs := 0
for _, plugin := range r.plugins {
if plugin.IsHealthy(r.healthCheckTimeout) {
healthyPlugins++
}
plugin.mu.RLock()
totalActiveJobs += plugin.ActiveJobs
totalCompletedJobs += plugin.CompletedJobs
totalFailedJobs += plugin.FailedJobs
plugin.mu.RUnlock()
}
return map[string]interface{}{
"total_plugins": totalPlugins,
"healthy_plugins": healthyPlugins,
"unhealthy_plugins": totalPlugins - healthyPlugins,
"total_active_jobs": totalActiveJobs,
"total_completed": totalCompletedJobs,
"total_failed": totalFailedJobs,
"capabilities": len(r.capabilityIndex),
}
}