mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-16 23:01:30 +00:00
- Fix ShowPlugins handler to fetch plugins from registry instead of returning empty
- Update PluginsPageData struct to use []map[string]interface{} for proper data handling
- Rewrite plugins.templ to properly iterate and display plugin details
- Plugin count now displays correctly in the card header
- Plugin table shows ID, Name, Status, Version, Capabilities, and Actions
- Plugin worker successfully connects and displays in admin dashboard
fix: Add plugin routes to non-auth section and resolve route conflicts
- Plugin routes were only registered when authRequired=true (password set)
- When no admin password was set, auth was disabled and routes were skipped
- Also changed route paths to avoid conflicts in Gin router:
- Changed /jobs/:type to /jobs/by-type/:type to avoid conflict with /jobs/:id/cancel
- Changed /jobs/:type/trigger-detection to /trigger-detection/:type
- Changed /jobs/:id/cancel to /cancel-job/:id
- Plugin UI now accessible at http://localhost:23646/plugins
feat: Add plugin_worker command for new plugin system
- Create new generic plugin worker that connects to admin server via gRPC
- Supports multiple plugins: erasure_coding, vacuum, balance
- Replaces old task-based worker system with plugin-based approach
- Automatically registers with admin server on startup
- Sends periodic health reports to admin server
- Configuration saved in working directory
- Usage: weed plugin_worker -admin=localhost:33650 -plugins=erasure_coding,vacuum,balance
fix: Initialize plugin manager and register PluginService on gRPC server
- Initialize plugin manager in admin_server.initPluginManager() instead of placeholder
- Create plugin configuration directory in admin dataDir/plugins
- Register PluginService, AdminQueryService, and AdminCommandService on worker gRPC server
- Plugin worker can now connect and register with admin server
Changes:
- weed/admin/dash/admin_server.go: Properly initialize plugin manager
- weed/admin/dash/worker_grpc_server.go: Register plugin services on gRPC server
Testing:
- Plugin worker connects successfully to admin server
- Plugin capabilities are registered correctly
- Health reporting works as expected
458 lines
13 KiB
Go
458 lines
13 KiB
Go
package plugin
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
|
)
|
|
|
|
// GRPCServer implements the plugin service gRPC handlers
|
|
type GRPCServer struct {
|
|
mu sync.RWMutex
|
|
registry *Registry
|
|
queue *JobQueue
|
|
dispatcher *Dispatcher
|
|
configMgr *ConfigManager
|
|
streamMu sync.RWMutex
|
|
activeStreams map[string][]chan interface{}
|
|
plugin_pb.UnimplementedPluginServiceServer
|
|
plugin_pb.UnimplementedAdminQueryServiceServer
|
|
plugin_pb.UnimplementedAdminCommandServiceServer
|
|
}
|
|
|
|
// NewGRPCServer creates a new gRPC server
|
|
func NewGRPCServer(registry *Registry, queue *JobQueue, dispatcher *Dispatcher, configMgr *ConfigManager) *GRPCServer {
|
|
return &GRPCServer{
|
|
registry: registry,
|
|
queue: queue,
|
|
dispatcher: dispatcher,
|
|
configMgr: configMgr,
|
|
activeStreams: make(map[string][]chan interface{}),
|
|
}
|
|
}
|
|
|
|
// Connect registers a plugin with the master
|
|
func (gs *GRPCServer) Connect(ctx context.Context, req *plugin_pb.PluginConnectRequest) (*plugin_pb.PluginConnectResponse, error) {
|
|
if req.PluginId == "" {
|
|
return nil, fmt.Errorf("plugin_id is required")
|
|
}
|
|
|
|
// Create ConnectedPlugin instance
|
|
plugin := &ConnectedPlugin{
|
|
ID: req.PluginId,
|
|
Name: req.PluginName,
|
|
Version: req.Version,
|
|
Status: "CONNECTED",
|
|
Capabilities: req.Capabilities,
|
|
MaxConcurrentJobs: int(req.MaxConcurrentJobs),
|
|
ConnectedAt: time.Now(),
|
|
LastHeartbeat: time.Now(),
|
|
Metadata: req.Metadata,
|
|
HealthCheckInterval: 30 * time.Second,
|
|
JobTimeout: 5 * time.Minute,
|
|
}
|
|
|
|
// Register plugin
|
|
if err := gs.registry.RegisterPlugin(plugin); err != nil {
|
|
return nil, fmt.Errorf("failed to register plugin: %w", err)
|
|
}
|
|
|
|
// Load or create configuration
|
|
config, err := gs.configMgr.LoadConfig(req.PluginId)
|
|
if err != nil {
|
|
// Create default config
|
|
config = &PluginConfig{
|
|
PluginID: req.PluginId,
|
|
Properties: make(map[string]string),
|
|
JobTypes: make(map[string]*JobTypeConfig),
|
|
MaxRetries: 3,
|
|
HealthCheckInterval: 30 * time.Second,
|
|
JobTimeout: 5 * time.Minute,
|
|
Environment: make(map[string]string),
|
|
}
|
|
gs.configMgr.SaveConfig(config, false)
|
|
}
|
|
|
|
// Build response
|
|
pbConfig := &plugin_pb.PluginConfig{
|
|
PluginId: config.PluginID,
|
|
Properties: config.Properties,
|
|
MaxRetries: int32(config.MaxRetries),
|
|
Environment: config.Environment,
|
|
}
|
|
|
|
response := &plugin_pb.PluginConnectResponse{
|
|
Success: true,
|
|
Message: "Plugin registered successfully",
|
|
MasterId: "master-1",
|
|
Config: pbConfig,
|
|
AssignedTypes: req.Capabilities,
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// ExecuteJob processes a detection or maintenance job
|
|
func (gs *GRPCServer) ExecuteJob(ctx context.Context, req *plugin_pb.ExecuteJobRequest) (*plugin_pb.ExecuteJobResponse, error) {
|
|
if req.JobId == "" || req.JobType == "" {
|
|
return nil, fmt.Errorf("job_id and job_type are required")
|
|
}
|
|
|
|
response := &plugin_pb.ExecuteJobResponse{
|
|
JobId: req.JobId,
|
|
Status: plugin_pb.ExecutionStatus_EXECUTION_STATUS_ACCEPTED,
|
|
Message: "Job accepted for execution",
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// ReportHealth processes health reports from plugins
|
|
func (gs *GRPCServer) ReportHealth(ctx context.Context, report *plugin_pb.HealthReport) (*plugin_pb.HealthReportResponse, error) {
|
|
if report.PluginId == "" {
|
|
return nil, fmt.Errorf("plugin_id is required")
|
|
}
|
|
|
|
// Update heartbeat
|
|
if err := gs.registry.UpdateHeartbeat(report.PluginId); err != nil {
|
|
return nil, fmt.Errorf("plugin not found: %w", err)
|
|
}
|
|
|
|
// Update plugin stats
|
|
if plugin, err := gs.registry.GetPlugin(report.PluginId); err == nil {
|
|
plugin.mu.Lock()
|
|
plugin.ActiveJobs = int(report.ActiveJobs)
|
|
plugin.CPUUsagePercent = float64(report.CpuPercent)
|
|
plugin.MemoryUsageBytes = report.MemoryBytes
|
|
plugin.mu.Unlock()
|
|
}
|
|
|
|
return &plugin_pb.HealthReportResponse{
|
|
Acknowledged: true,
|
|
Feedback: "Health report received",
|
|
}, nil
|
|
}
|
|
|
|
// GetConfig retrieves the latest configuration
|
|
func (gs *GRPCServer) GetConfig(ctx context.Context, req *plugin_pb.GetConfigRequest) (*plugin_pb.GetConfigResponse, error) {
|
|
if req.PluginId == "" {
|
|
return nil, fmt.Errorf("plugin_id is required")
|
|
}
|
|
|
|
config, exists := gs.configMgr.GetConfig(req.PluginId)
|
|
if !exists {
|
|
return nil, fmt.Errorf("config not found for plugin: %s", req.PluginId)
|
|
}
|
|
|
|
pbConfig := &plugin_pb.PluginConfig{
|
|
PluginId: config.PluginID,
|
|
Properties: config.Properties,
|
|
MaxRetries: int32(config.MaxRetries),
|
|
Environment: config.Environment,
|
|
}
|
|
|
|
response := &plugin_pb.GetConfigResponse{
|
|
Config: pbConfig,
|
|
Version: gs.configMgr.GetVersion(req.PluginId),
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// SubmitResult sends job execution results back to master
|
|
func (gs *GRPCServer) SubmitResult(ctx context.Context, req *plugin_pb.JobResultRequest) (*plugin_pb.JobResultResponse, error) {
|
|
if req.JobId == "" {
|
|
return nil, fmt.Errorf("job_id is required")
|
|
}
|
|
|
|
actions := []string{}
|
|
|
|
// Process results based on job status
|
|
switch req.Status {
|
|
case plugin_pb.ExecutionStatus_EXECUTION_STATUS_COMPLETED:
|
|
actions = append(actions, "ARCHIVED")
|
|
case plugin_pb.ExecutionStatus_EXECUTION_STATUS_FAILED:
|
|
actions = append(actions, "RETRY", "NOTIFY_ADMIN")
|
|
}
|
|
|
|
response := &plugin_pb.JobResultResponse{
|
|
Acknowledged: true,
|
|
ActionsToTake: actions,
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// GetPluginStats returns statistics for all connected plugins
|
|
func (gs *GRPCServer) GetPluginStats(ctx context.Context, req *plugin_pb.GetPluginStatsRequest) (*plugin_pb.GetPluginStatsResponse, error) {
|
|
response := &plugin_pb.GetPluginStatsResponse{
|
|
Stats: []*plugin_pb.PluginStats{},
|
|
}
|
|
|
|
var plugins []*ConnectedPlugin
|
|
if req.PluginId != "" {
|
|
if plugin, err := gs.registry.GetPlugin(req.PluginId); err == nil {
|
|
plugins = append(plugins, plugin)
|
|
}
|
|
} else {
|
|
plugins = gs.registry.ListPlugins(false)
|
|
}
|
|
|
|
for _, plugin := range plugins {
|
|
stat := &plugin_pb.PluginStats{
|
|
PluginId: plugin.ID,
|
|
Status: plugin.Status,
|
|
ActiveJobs: int32(plugin.ActiveJobs),
|
|
CompletedJobs: int32(plugin.CompletedJobs),
|
|
FailedJobs: int32(plugin.FailedJobs),
|
|
TotalDetections: plugin.TotalDetections,
|
|
AvgExecutionTimeMs: float32(plugin.AvgExecutionTimeMs),
|
|
CpuUsagePercent: float32(plugin.CPUUsagePercent),
|
|
MemoryUsageBytes: plugin.MemoryUsageBytes,
|
|
UptimeSeconds: int32(time.Since(plugin.ConnectedAt).Seconds()),
|
|
}
|
|
response.Stats = append(response.Stats, stat)
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// ListPlugins returns information about all registered plugins
|
|
func (gs *GRPCServer) ListPlugins(ctx context.Context, req *plugin_pb.ListPluginsRequest) (*plugin_pb.ListPluginsResponse, error) {
|
|
response := &plugin_pb.ListPluginsResponse{
|
|
Plugins: []*plugin_pb.PluginInfo{},
|
|
}
|
|
|
|
plugins := gs.registry.ListPlugins(!req.IncludeDisabled)
|
|
|
|
for _, plugin := range plugins {
|
|
// Filter by capability if specified
|
|
if len(req.FilterByCapability) > 0 {
|
|
hasCapability := false
|
|
for _, filterCap := range req.FilterByCapability {
|
|
for _, cap := range plugin.Capabilities {
|
|
if cap == filterCap {
|
|
hasCapability = true
|
|
break
|
|
}
|
|
}
|
|
if hasCapability {
|
|
break
|
|
}
|
|
}
|
|
if !hasCapability {
|
|
continue
|
|
}
|
|
}
|
|
|
|
info := &plugin_pb.PluginInfo{
|
|
PluginId: plugin.ID,
|
|
Name: plugin.Name,
|
|
Version: plugin.Version,
|
|
Status: plugin.Status,
|
|
Capabilities: plugin.Capabilities,
|
|
MaxConcurrentJobs: int32(plugin.MaxConcurrentJobs),
|
|
ActiveJobs: int32(plugin.ActiveJobs),
|
|
Metadata: plugin.Metadata,
|
|
}
|
|
response.Plugins = append(response.Plugins, info)
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// ListJobs returns current and historical job information
|
|
func (gs *GRPCServer) ListJobs(ctx context.Context, req *plugin_pb.ListJobsRequest) (*plugin_pb.ListJobsResponse, error) {
|
|
response := &plugin_pb.ListJobsResponse{
|
|
Jobs: []*plugin_pb.JobInfo{},
|
|
}
|
|
|
|
var records []*ExecutionRecord
|
|
limit := int(req.Limit)
|
|
if limit <= 0 {
|
|
limit = 100
|
|
}
|
|
|
|
if req.PluginId != "" {
|
|
records = gs.queue.GetHistoryForPlugin(req.PluginId, limit)
|
|
} else {
|
|
records = gs.queue.GetHistory(limit)
|
|
}
|
|
|
|
for _, record := range records {
|
|
info := &plugin_pb.JobInfo{
|
|
JobId: record.JobID,
|
|
JobType: record.JobType,
|
|
PluginId: record.PluginID,
|
|
RetryCount: int32(record.RetryCount),
|
|
LastError: record.LastError,
|
|
}
|
|
response.Jobs = append(response.Jobs, info)
|
|
}
|
|
|
|
response.TotalCount = int32(gs.queue.HistorySize())
|
|
return response, nil
|
|
}
|
|
|
|
// GetJobStatus returns detailed status of a specific job
|
|
func (gs *GRPCServer) GetJobStatus(ctx context.Context, req *plugin_pb.GetJobStatusRequest) (*plugin_pb.GetJobStatusResponse, error) {
|
|
if req.JobId == "" {
|
|
return nil, fmt.Errorf("job_id is required")
|
|
}
|
|
|
|
// Search in queue history
|
|
records := gs.queue.GetHistory(10000)
|
|
for _, record := range records {
|
|
if record.JobID == req.JobId {
|
|
response := &plugin_pb.GetJobStatusResponse{
|
|
DetailedStatus: record.State.String(),
|
|
}
|
|
return response, nil
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("job not found: %s", req.JobId)
|
|
}
|
|
|
|
// GetPluginLogs returns logs from a specific plugin (stub implementation)
|
|
func (gs *GRPCServer) GetPluginLogs(ctx context.Context, req *plugin_pb.GetPluginLogsRequest) (*plugin_pb.GetPluginLogsResponse, error) {
|
|
response := &plugin_pb.GetPluginLogsResponse{
|
|
Entries: []*plugin_pb.LogEntry{},
|
|
}
|
|
return response, nil
|
|
}
|
|
|
|
// SaveConfig persists plugin configuration
|
|
func (gs *GRPCServer) SaveConfig(ctx context.Context, req *plugin_pb.SaveConfigRequest) (*plugin_pb.SaveConfigResponse, error) {
|
|
if req.Config == nil {
|
|
return nil, fmt.Errorf("config is required")
|
|
}
|
|
|
|
// Convert from protobuf config to internal config
|
|
config := &PluginConfig{
|
|
PluginID: req.Config.PluginId,
|
|
Properties: req.Config.Properties,
|
|
MaxRetries: int(req.Config.MaxRetries),
|
|
Environment: req.Config.Environment,
|
|
JobTypes: make(map[string]*JobTypeConfig),
|
|
}
|
|
|
|
if err := gs.configMgr.SaveConfig(config, req.BackupExisting); err != nil {
|
|
return nil, fmt.Errorf("failed to save config: %w", err)
|
|
}
|
|
|
|
response := &plugin_pb.SaveConfigResponse{
|
|
Success: true,
|
|
Message: "Configuration saved successfully",
|
|
ConfigVersion: gs.configMgr.GetVersion(req.Config.PluginId),
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// ReloadConfig reloads configuration without restarting
|
|
func (gs *GRPCServer) ReloadConfig(ctx context.Context, req *plugin_pb.ReloadConfigRequest) (*plugin_pb.ReloadConfigResponse, error) {
|
|
if req.PluginId == "" {
|
|
return nil, fmt.Errorf("plugin_id is required")
|
|
}
|
|
|
|
if _, err := gs.configMgr.LoadConfig(req.PluginId); err != nil {
|
|
return nil, fmt.Errorf("failed to reload config: %w", err)
|
|
}
|
|
|
|
response := &plugin_pb.ReloadConfigResponse{
|
|
Success: true,
|
|
Message: "Configuration reloaded successfully",
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// EnablePlugin enables a specific plugin
|
|
func (gs *GRPCServer) EnablePlugin(ctx context.Context, req *plugin_pb.EnablePluginRequest) (*plugin_pb.EnablePluginResponse, error) {
|
|
if err := gs.registry.UpdatePluginStatus(req.PluginId, "ENABLED"); err != nil {
|
|
return nil, fmt.Errorf("failed to enable plugin: %w", err)
|
|
}
|
|
|
|
response := &plugin_pb.EnablePluginResponse{
|
|
Success: true,
|
|
Message: "Plugin enabled successfully",
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// DisablePlugin disables a specific plugin
|
|
func (gs *GRPCServer) DisablePlugin(ctx context.Context, req *plugin_pb.DisablePluginRequest) (*plugin_pb.DisablePluginResponse, error) {
|
|
if err := gs.registry.UpdatePluginStatus(req.PluginId, "DISABLED"); err != nil {
|
|
return nil, fmt.Errorf("failed to disable plugin: %w", err)
|
|
}
|
|
|
|
response := &plugin_pb.DisablePluginResponse{
|
|
Success: true,
|
|
Message: "Plugin disabled successfully",
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// TriggerDetection manually triggers a detection for specific types
|
|
func (gs *GRPCServer) TriggerDetection(ctx context.Context, req *plugin_pb.TriggerDetectionRequest) (*plugin_pb.TriggerDetectionResponse, error) {
|
|
response := &plugin_pb.TriggerDetectionResponse{
|
|
Success: true,
|
|
TriggeredJobIds: []string{},
|
|
}
|
|
|
|
for _, detectionType := range req.DetectionTypes {
|
|
jobID := fmt.Sprintf("trig-%s-%d", detectionType, time.Now().UnixNano())
|
|
job := &Job{
|
|
ID: jobID,
|
|
Type: detectionType,
|
|
State: JobStatePending,
|
|
CreatedAt: time.Now(),
|
|
}
|
|
|
|
if err := gs.queue.Enqueue(job); err != nil {
|
|
continue
|
|
}
|
|
response.TriggeredJobIds = append(response.TriggeredJobIds, jobID)
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// CancelJob cancels a running job
|
|
func (gs *GRPCServer) CancelJob(ctx context.Context, req *plugin_pb.CancelJobRequest) (*plugin_pb.CancelJobResponse, error) {
|
|
if req.JobId == "" {
|
|
return nil, fmt.Errorf("job_id is required")
|
|
}
|
|
|
|
if gs.queue.RemoveJob(req.JobId) {
|
|
return &plugin_pb.CancelJobResponse{
|
|
Success: true,
|
|
Message: "Job cancelled successfully",
|
|
}, nil
|
|
}
|
|
|
|
return &plugin_pb.CancelJobResponse{
|
|
Success: false,
|
|
Message: "Job not found or already completed",
|
|
}, nil
|
|
}
|
|
|
|
// PurgeHistory clears job history
|
|
func (gs *GRPCServer) PurgeHistory(ctx context.Context, req *plugin_pb.PurgeHistoryRequest) (*plugin_pb.PurgeHistoryResponse, error) {
|
|
beforeTime := time.Unix(0, req.BeforeTimestampMs*1000000)
|
|
deleted := gs.queue.PurgeOldHistory(beforeTime)
|
|
|
|
response := &plugin_pb.PurgeHistoryResponse{
|
|
Success: true,
|
|
RecordsDeleted: int32(deleted),
|
|
}
|
|
|
|
return response, nil
|
|
}
|