Files
seaweedfs/weed/admin/plugin/grpc_server.go
Chris Lu 5342328836 fix: Display connected plugins in admin UI
- Fix ShowPlugins handler to fetch plugins from registry instead of returning empty
- Update PluginsPageData struct to use []map[string]interface{} for proper data handling
- Rewrite plugins.templ to properly iterate and display plugin details
- Plugin count now displays correctly in the card header
- Plugin table shows ID, Name, Status, Version, Capabilities, and Actions
- Plugin worker successfully connects and displays in admin dashboard

fix: Add plugin routes to non-auth section and resolve route conflicts

- Plugin routes were only registered when authRequired=true (password set)
- When no admin password was set, auth was disabled and routes were skipped
- Also changed route paths to avoid conflicts in Gin router:
  - Changed /jobs/:type to /jobs/by-type/:type to avoid conflict with /jobs/:id/cancel
  - Changed /jobs/:type/trigger-detection to /trigger-detection/:type
  - Changed /jobs/:id/cancel to /cancel-job/:id
- Plugin UI now accessible at http://localhost:23646/plugins

feat: Add plugin_worker command for new plugin system

- Create new generic plugin worker that connects to admin server via gRPC
- Supports multiple plugins: erasure_coding, vacuum, balance
- Replaces old task-based worker system with plugin-based approach
- Automatically registers with admin server on startup
- Sends periodic health reports to admin server
- Configuration saved in working directory
- Usage: weed plugin_worker -admin=localhost:33650 -plugins=erasure_coding,vacuum,balance

fix: Initialize plugin manager and register PluginService on gRPC server

- Initialize plugin manager in admin_server.initPluginManager() instead of placeholder
- Create plugin configuration directory in admin dataDir/plugins
- Register PluginService, AdminQueryService, and AdminCommandService on worker gRPC server
- Plugin worker can now connect and register with admin server

Changes:
- weed/admin/dash/admin_server.go: Properly initialize plugin manager
- weed/admin/dash/worker_grpc_server.go: Register plugin services on gRPC server

Testing:
- Plugin worker connects successfully to admin server
- Plugin capabilities are registered correctly
- Health reporting works as expected
2026-02-17 11:08:35 -08:00

458 lines
13 KiB
Go

package plugin
import (
"context"
"fmt"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
)
// GRPCServer implements the plugin service gRPC handlers
type GRPCServer struct {
mu sync.RWMutex
registry *Registry
queue *JobQueue
dispatcher *Dispatcher
configMgr *ConfigManager
streamMu sync.RWMutex
activeStreams map[string][]chan interface{}
plugin_pb.UnimplementedPluginServiceServer
plugin_pb.UnimplementedAdminQueryServiceServer
plugin_pb.UnimplementedAdminCommandServiceServer
}
// NewGRPCServer creates a new gRPC server
func NewGRPCServer(registry *Registry, queue *JobQueue, dispatcher *Dispatcher, configMgr *ConfigManager) *GRPCServer {
return &GRPCServer{
registry: registry,
queue: queue,
dispatcher: dispatcher,
configMgr: configMgr,
activeStreams: make(map[string][]chan interface{}),
}
}
// Connect registers a plugin with the master
func (gs *GRPCServer) Connect(ctx context.Context, req *plugin_pb.PluginConnectRequest) (*plugin_pb.PluginConnectResponse, error) {
if req.PluginId == "" {
return nil, fmt.Errorf("plugin_id is required")
}
// Create ConnectedPlugin instance
plugin := &ConnectedPlugin{
ID: req.PluginId,
Name: req.PluginName,
Version: req.Version,
Status: "CONNECTED",
Capabilities: req.Capabilities,
MaxConcurrentJobs: int(req.MaxConcurrentJobs),
ConnectedAt: time.Now(),
LastHeartbeat: time.Now(),
Metadata: req.Metadata,
HealthCheckInterval: 30 * time.Second,
JobTimeout: 5 * time.Minute,
}
// Register plugin
if err := gs.registry.RegisterPlugin(plugin); err != nil {
return nil, fmt.Errorf("failed to register plugin: %w", err)
}
// Load or create configuration
config, err := gs.configMgr.LoadConfig(req.PluginId)
if err != nil {
// Create default config
config = &PluginConfig{
PluginID: req.PluginId,
Properties: make(map[string]string),
JobTypes: make(map[string]*JobTypeConfig),
MaxRetries: 3,
HealthCheckInterval: 30 * time.Second,
JobTimeout: 5 * time.Minute,
Environment: make(map[string]string),
}
gs.configMgr.SaveConfig(config, false)
}
// Build response
pbConfig := &plugin_pb.PluginConfig{
PluginId: config.PluginID,
Properties: config.Properties,
MaxRetries: int32(config.MaxRetries),
Environment: config.Environment,
}
response := &plugin_pb.PluginConnectResponse{
Success: true,
Message: "Plugin registered successfully",
MasterId: "master-1",
Config: pbConfig,
AssignedTypes: req.Capabilities,
}
return response, nil
}
// ExecuteJob processes a detection or maintenance job
func (gs *GRPCServer) ExecuteJob(ctx context.Context, req *plugin_pb.ExecuteJobRequest) (*plugin_pb.ExecuteJobResponse, error) {
if req.JobId == "" || req.JobType == "" {
return nil, fmt.Errorf("job_id and job_type are required")
}
response := &plugin_pb.ExecuteJobResponse{
JobId: req.JobId,
Status: plugin_pb.ExecutionStatus_EXECUTION_STATUS_ACCEPTED,
Message: "Job accepted for execution",
}
return response, nil
}
// ReportHealth processes health reports from plugins
func (gs *GRPCServer) ReportHealth(ctx context.Context, report *plugin_pb.HealthReport) (*plugin_pb.HealthReportResponse, error) {
if report.PluginId == "" {
return nil, fmt.Errorf("plugin_id is required")
}
// Update heartbeat
if err := gs.registry.UpdateHeartbeat(report.PluginId); err != nil {
return nil, fmt.Errorf("plugin not found: %w", err)
}
// Update plugin stats
if plugin, err := gs.registry.GetPlugin(report.PluginId); err == nil {
plugin.mu.Lock()
plugin.ActiveJobs = int(report.ActiveJobs)
plugin.CPUUsagePercent = float64(report.CpuPercent)
plugin.MemoryUsageBytes = report.MemoryBytes
plugin.mu.Unlock()
}
return &plugin_pb.HealthReportResponse{
Acknowledged: true,
Feedback: "Health report received",
}, nil
}
// GetConfig retrieves the latest configuration
func (gs *GRPCServer) GetConfig(ctx context.Context, req *plugin_pb.GetConfigRequest) (*plugin_pb.GetConfigResponse, error) {
if req.PluginId == "" {
return nil, fmt.Errorf("plugin_id is required")
}
config, exists := gs.configMgr.GetConfig(req.PluginId)
if !exists {
return nil, fmt.Errorf("config not found for plugin: %s", req.PluginId)
}
pbConfig := &plugin_pb.PluginConfig{
PluginId: config.PluginID,
Properties: config.Properties,
MaxRetries: int32(config.MaxRetries),
Environment: config.Environment,
}
response := &plugin_pb.GetConfigResponse{
Config: pbConfig,
Version: gs.configMgr.GetVersion(req.PluginId),
}
return response, nil
}
// SubmitResult sends job execution results back to master
func (gs *GRPCServer) SubmitResult(ctx context.Context, req *plugin_pb.JobResultRequest) (*plugin_pb.JobResultResponse, error) {
if req.JobId == "" {
return nil, fmt.Errorf("job_id is required")
}
actions := []string{}
// Process results based on job status
switch req.Status {
case plugin_pb.ExecutionStatus_EXECUTION_STATUS_COMPLETED:
actions = append(actions, "ARCHIVED")
case plugin_pb.ExecutionStatus_EXECUTION_STATUS_FAILED:
actions = append(actions, "RETRY", "NOTIFY_ADMIN")
}
response := &plugin_pb.JobResultResponse{
Acknowledged: true,
ActionsToTake: actions,
}
return response, nil
}
// GetPluginStats returns statistics for all connected plugins
func (gs *GRPCServer) GetPluginStats(ctx context.Context, req *plugin_pb.GetPluginStatsRequest) (*plugin_pb.GetPluginStatsResponse, error) {
response := &plugin_pb.GetPluginStatsResponse{
Stats: []*plugin_pb.PluginStats{},
}
var plugins []*ConnectedPlugin
if req.PluginId != "" {
if plugin, err := gs.registry.GetPlugin(req.PluginId); err == nil {
plugins = append(plugins, plugin)
}
} else {
plugins = gs.registry.ListPlugins(false)
}
for _, plugin := range plugins {
stat := &plugin_pb.PluginStats{
PluginId: plugin.ID,
Status: plugin.Status,
ActiveJobs: int32(plugin.ActiveJobs),
CompletedJobs: int32(plugin.CompletedJobs),
FailedJobs: int32(plugin.FailedJobs),
TotalDetections: plugin.TotalDetections,
AvgExecutionTimeMs: float32(plugin.AvgExecutionTimeMs),
CpuUsagePercent: float32(plugin.CPUUsagePercent),
MemoryUsageBytes: plugin.MemoryUsageBytes,
UptimeSeconds: int32(time.Since(plugin.ConnectedAt).Seconds()),
}
response.Stats = append(response.Stats, stat)
}
return response, nil
}
// ListPlugins returns information about all registered plugins
func (gs *GRPCServer) ListPlugins(ctx context.Context, req *plugin_pb.ListPluginsRequest) (*plugin_pb.ListPluginsResponse, error) {
response := &plugin_pb.ListPluginsResponse{
Plugins: []*plugin_pb.PluginInfo{},
}
plugins := gs.registry.ListPlugins(!req.IncludeDisabled)
for _, plugin := range plugins {
// Filter by capability if specified
if len(req.FilterByCapability) > 0 {
hasCapability := false
for _, filterCap := range req.FilterByCapability {
for _, cap := range plugin.Capabilities {
if cap == filterCap {
hasCapability = true
break
}
}
if hasCapability {
break
}
}
if !hasCapability {
continue
}
}
info := &plugin_pb.PluginInfo{
PluginId: plugin.ID,
Name: plugin.Name,
Version: plugin.Version,
Status: plugin.Status,
Capabilities: plugin.Capabilities,
MaxConcurrentJobs: int32(plugin.MaxConcurrentJobs),
ActiveJobs: int32(plugin.ActiveJobs),
Metadata: plugin.Metadata,
}
response.Plugins = append(response.Plugins, info)
}
return response, nil
}
// ListJobs returns current and historical job information
func (gs *GRPCServer) ListJobs(ctx context.Context, req *plugin_pb.ListJobsRequest) (*plugin_pb.ListJobsResponse, error) {
response := &plugin_pb.ListJobsResponse{
Jobs: []*plugin_pb.JobInfo{},
}
var records []*ExecutionRecord
limit := int(req.Limit)
if limit <= 0 {
limit = 100
}
if req.PluginId != "" {
records = gs.queue.GetHistoryForPlugin(req.PluginId, limit)
} else {
records = gs.queue.GetHistory(limit)
}
for _, record := range records {
info := &plugin_pb.JobInfo{
JobId: record.JobID,
JobType: record.JobType,
PluginId: record.PluginID,
RetryCount: int32(record.RetryCount),
LastError: record.LastError,
}
response.Jobs = append(response.Jobs, info)
}
response.TotalCount = int32(gs.queue.HistorySize())
return response, nil
}
// GetJobStatus returns detailed status of a specific job
func (gs *GRPCServer) GetJobStatus(ctx context.Context, req *plugin_pb.GetJobStatusRequest) (*plugin_pb.GetJobStatusResponse, error) {
if req.JobId == "" {
return nil, fmt.Errorf("job_id is required")
}
// Search in queue history
records := gs.queue.GetHistory(10000)
for _, record := range records {
if record.JobID == req.JobId {
response := &plugin_pb.GetJobStatusResponse{
DetailedStatus: record.State.String(),
}
return response, nil
}
}
return nil, fmt.Errorf("job not found: %s", req.JobId)
}
// GetPluginLogs returns logs from a specific plugin (stub implementation)
func (gs *GRPCServer) GetPluginLogs(ctx context.Context, req *plugin_pb.GetPluginLogsRequest) (*plugin_pb.GetPluginLogsResponse, error) {
response := &plugin_pb.GetPluginLogsResponse{
Entries: []*plugin_pb.LogEntry{},
}
return response, nil
}
// SaveConfig persists plugin configuration
func (gs *GRPCServer) SaveConfig(ctx context.Context, req *plugin_pb.SaveConfigRequest) (*plugin_pb.SaveConfigResponse, error) {
if req.Config == nil {
return nil, fmt.Errorf("config is required")
}
// Convert from protobuf config to internal config
config := &PluginConfig{
PluginID: req.Config.PluginId,
Properties: req.Config.Properties,
MaxRetries: int(req.Config.MaxRetries),
Environment: req.Config.Environment,
JobTypes: make(map[string]*JobTypeConfig),
}
if err := gs.configMgr.SaveConfig(config, req.BackupExisting); err != nil {
return nil, fmt.Errorf("failed to save config: %w", err)
}
response := &plugin_pb.SaveConfigResponse{
Success: true,
Message: "Configuration saved successfully",
ConfigVersion: gs.configMgr.GetVersion(req.Config.PluginId),
}
return response, nil
}
// ReloadConfig reloads configuration without restarting
func (gs *GRPCServer) ReloadConfig(ctx context.Context, req *plugin_pb.ReloadConfigRequest) (*plugin_pb.ReloadConfigResponse, error) {
if req.PluginId == "" {
return nil, fmt.Errorf("plugin_id is required")
}
if _, err := gs.configMgr.LoadConfig(req.PluginId); err != nil {
return nil, fmt.Errorf("failed to reload config: %w", err)
}
response := &plugin_pb.ReloadConfigResponse{
Success: true,
Message: "Configuration reloaded successfully",
}
return response, nil
}
// EnablePlugin enables a specific plugin
func (gs *GRPCServer) EnablePlugin(ctx context.Context, req *plugin_pb.EnablePluginRequest) (*plugin_pb.EnablePluginResponse, error) {
if err := gs.registry.UpdatePluginStatus(req.PluginId, "ENABLED"); err != nil {
return nil, fmt.Errorf("failed to enable plugin: %w", err)
}
response := &plugin_pb.EnablePluginResponse{
Success: true,
Message: "Plugin enabled successfully",
}
return response, nil
}
// DisablePlugin disables a specific plugin
func (gs *GRPCServer) DisablePlugin(ctx context.Context, req *plugin_pb.DisablePluginRequest) (*plugin_pb.DisablePluginResponse, error) {
if err := gs.registry.UpdatePluginStatus(req.PluginId, "DISABLED"); err != nil {
return nil, fmt.Errorf("failed to disable plugin: %w", err)
}
response := &plugin_pb.DisablePluginResponse{
Success: true,
Message: "Plugin disabled successfully",
}
return response, nil
}
// TriggerDetection manually triggers a detection for specific types
func (gs *GRPCServer) TriggerDetection(ctx context.Context, req *plugin_pb.TriggerDetectionRequest) (*plugin_pb.TriggerDetectionResponse, error) {
response := &plugin_pb.TriggerDetectionResponse{
Success: true,
TriggeredJobIds: []string{},
}
for _, detectionType := range req.DetectionTypes {
jobID := fmt.Sprintf("trig-%s-%d", detectionType, time.Now().UnixNano())
job := &Job{
ID: jobID,
Type: detectionType,
State: JobStatePending,
CreatedAt: time.Now(),
}
if err := gs.queue.Enqueue(job); err != nil {
continue
}
response.TriggeredJobIds = append(response.TriggeredJobIds, jobID)
}
return response, nil
}
// CancelJob cancels a running job
func (gs *GRPCServer) CancelJob(ctx context.Context, req *plugin_pb.CancelJobRequest) (*plugin_pb.CancelJobResponse, error) {
if req.JobId == "" {
return nil, fmt.Errorf("job_id is required")
}
if gs.queue.RemoveJob(req.JobId) {
return &plugin_pb.CancelJobResponse{
Success: true,
Message: "Job cancelled successfully",
}, nil
}
return &plugin_pb.CancelJobResponse{
Success: false,
Message: "Job not found or already completed",
}, nil
}
// PurgeHistory clears job history
func (gs *GRPCServer) PurgeHistory(ctx context.Context, req *plugin_pb.PurgeHistoryRequest) (*plugin_pb.PurgeHistoryResponse, error) {
beforeTime := time.Unix(0, req.BeforeTimestampMs*1000000)
deleted := gs.queue.PurgeOldHistory(beforeTime)
response := &plugin_pb.PurgeHistoryResponse{
Success: true,
RecordsDeleted: int32(deleted),
}
return response, nil
}