package testing import ( "context" "sync" "time" "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" ) // MockPlugin simulates a plugin worker instance for testing type MockPlugin struct { mu sync.RWMutex ID string Name string Version string Status string Capabilities []string CapabilitiesDetail *plugin_pb.PluginCapabilities MaxConcurrentJobs int Config *plugin_pb.PluginConfig ActiveJobs map[string]*MockJobExecution CompletedJobs int FailedJobs int ConnectStreamCalls int ExecuteJobStreamCalls int ReportHealthCalls int GetConfigCalls int SubmitResultCalls int ReceivedJobs []*plugin_pb.ExecuteJobRequest ReceivedHealthReports []*plugin_pb.HealthReport LastError string SimulateError bool SimulateErrorType string SchemaData []byte DetectionResults []*DetectionResult ExecutionResults []*ExecutionResult } // MockJobExecution tracks job execution state type MockJobExecution struct { JobID string Type string Status plugin_pb.ExecutionStatus Progress float32 CurrentStep string StartTime time.Time EndTime *time.Time Result *plugin_pb.JobResult ErrorMessage string } // DetectionResult represents detection results type DetectionResult struct { ResourceID string DetectionType string Severity string Description string Data []byte } // ExecutionResult represents execution results type ExecutionResult struct { ResourceID string Success bool ErrorMessage string Data []byte } // NewMockPlugin creates a new mock plugin func NewMockPlugin(id, name, version string) *MockPlugin { return &MockPlugin{ ID: id, Name: name, Version: version, Status: "ready", Capabilities: make([]string, 0), CapabilitiesDetail: &plugin_pb.PluginCapabilities{}, MaxConcurrentJobs: 5, Config: &plugin_pb.PluginConfig{}, ActiveJobs: make(map[string]*MockJobExecution), ReceivedJobs: make([]*plugin_pb.ExecuteJobRequest, 0), ReceivedHealthReports: make([]*plugin_pb.HealthReport, 0), DetectionResults: make([]*DetectionResult, 0), ExecutionResults: make([]*ExecutionResult, 0), } } // AddCapability adds a capability to the plugin func (m *MockPlugin) AddCapability(cap string) { m.mu.Lock() defer m.mu.Unlock() m.Capabilities = append(m.Capabilities, cap) } // AddDetectionCapability adds a detection capability func (m *MockPlugin) AddDetectionCapability(typ, desc string, minInterval int32, requiresFullScan bool) { m.mu.Lock() defer m.mu.Unlock() if m.CapabilitiesDetail == nil { m.CapabilitiesDetail = &plugin_pb.PluginCapabilities{} } m.CapabilitiesDetail.Detection = append(m.CapabilitiesDetail.Detection, &plugin_pb.DetectionCapability{ Type: typ, Description: desc, MinIntervalSeconds: minInterval, RequiresFullScan: requiresFullScan, }) m.Capabilities = append(m.Capabilities, typ) } // AddMaintenanceCapability adds a maintenance capability func (m *MockPlugin) AddMaintenanceCapability(typ, desc string, requiredDetections []string) { m.mu.Lock() defer m.mu.Unlock() if m.CapabilitiesDetail == nil { m.CapabilitiesDetail = &plugin_pb.PluginCapabilities{} } m.CapabilitiesDetail.Maintenance = append(m.CapabilitiesDetail.Maintenance, &plugin_pb.MaintenanceCapability{ Type: typ, Description: desc, RequiredDetectionTypes: requiredDetections, }) } // SetSchema sets the schema data func (m *MockPlugin) SetSchema(data []byte) { m.mu.Lock() defer m.mu.Unlock() m.SchemaData = data } // AddDetectionResult adds a detection result func (m *MockPlugin) AddDetectionResult(resourceID, detectionType, severity, description string, data []byte) { m.mu.Lock() defer m.mu.Unlock() m.DetectionResults = append(m.DetectionResults, &DetectionResult{ ResourceID: resourceID, DetectionType: detectionType, Severity: severity, Description: description, Data: data, }) } // AddExecutionResult adds an execution result func (m *MockPlugin) AddExecutionResult(resourceID string, success bool, errorMsg string, data []byte) { m.mu.Lock() defer m.mu.Unlock() m.ExecutionResults = append(m.ExecutionResults, &ExecutionResult{ ResourceID: resourceID, Success: success, ErrorMessage: errorMsg, Data: data, }) } // GetConfigurationSchema implements schema retrieval func (m *MockPlugin) GetConfigurationSchema(ctx context.Context) ([]byte, error) { m.mu.RLock() defer m.mu.RUnlock() if m.SimulateError && m.SimulateErrorType == "schema" { return nil, ErrSimulatedError } return m.SchemaData, nil } // DetectJobs implements detection logic func (m *MockPlugin) DetectJobs(ctx context.Context) ([]*DetectionResult, error) { m.mu.Lock() m.ReportHealthCalls++ results := make([]*DetectionResult, len(m.DetectionResults)) copy(results, m.DetectionResults) m.mu.Unlock() if m.SimulateError && m.SimulateErrorType == "detect" { return nil, ErrSimulatedError } return results, nil } // ExecuteJob implements job execution func (m *MockPlugin) ExecuteJob(ctx context.Context, jobID string, jobType string, payload *plugin_pb.JobPayload) (*ExecutionResult, error) { m.mu.Lock() m.ExecuteJobStreamCalls++ execution := &MockJobExecution{ JobID: jobID, Type: jobType, Status: plugin_pb.ExecutionStatus_EXECUTION_STATUS_RUNNING, StartTime: time.Now(), Progress: 0, CurrentStep: "initialized", } m.ActiveJobs[jobID] = execution m.mu.Unlock() // Simulate execution steps steps := []string{"initialized", "validating", "processing", "finalizing"} for i, step := range steps { select { case <-ctx.Done(): m.mu.Lock() execution.Status = plugin_pb.ExecutionStatus_EXECUTION_STATUS_CANCELLED execution.ErrorMessage = "context cancelled" delete(m.ActiveJobs, jobID) m.mu.Unlock() return nil, ctx.Err() default: } m.mu.Lock() execution.CurrentStep = step execution.Progress = float32((i + 1) * 25) m.mu.Unlock() time.Sleep(10 * time.Millisecond) } m.mu.Lock() defer m.mu.Unlock() if m.SimulateError && m.SimulateErrorType == "execute" { execution.Status = plugin_pb.ExecutionStatus_EXECUTION_STATUS_FAILED execution.ErrorMessage = "simulated execution error" m.FailedJobs++ delete(m.ActiveJobs, jobID) return nil, ErrSimulatedError } // Get results result := &ExecutionResult{ ResourceID: jobID, Success: true, ErrorMessage: "", } if len(m.ExecutionResults) > 0 { result = m.ExecutionResults[0] m.ExecutionResults = m.ExecutionResults[1:] } execution.Status = plugin_pb.ExecutionStatus_EXECUTION_STATUS_COMPLETED execution.Progress = 100 execution.CurrentStep = "completed" now := time.Now() execution.EndTime = &now m.CompletedJobs++ delete(m.ActiveJobs, jobID) return result, nil } // ConnectStream simulates the Connect RPC stream func (m *MockPlugin) ConnectStream(ctx context.Context, conn grpc.ClientConnInterface) error { m.mu.Lock() m.ConnectStreamCalls++ m.mu.Unlock() if m.SimulateError && m.SimulateErrorType == "connect" { return ErrSimulatedError } return nil } // ExecuteJobStream simulates the ExecuteJob RPC stream func (m *MockPlugin) ExecuteJobStream(ctx context.Context, conn grpc.ClientConnInterface, jobID string) error { m.mu.Lock() m.ExecuteJobStreamCalls++ m.mu.Unlock() if m.SimulateError && m.SimulateErrorType == "executestream" { return ErrSimulatedError } return nil } // ReportHealth sends a health report func (m *MockPlugin) ReportHealth(ctx context.Context, conn grpc.ClientConnInterface) error { m.mu.Lock() m.ReportHealthCalls++ activeCount := len(m.ActiveJobs) m.mu.Unlock() if m.SimulateError && m.SimulateErrorType == "health" { return ErrSimulatedError } report := &plugin_pb.HealthReport{ PluginId: m.ID, TimestampMs: time.Now().UnixMilli(), Status: plugin_pb.HealthStatus_HEALTH_STATUS_HEALTHY, ActiveJobs: int32(activeCount), } m.mu.Lock() m.ReceivedHealthReports = append(m.ReceivedHealthReports, report) m.mu.Unlock() return nil } // GetConfig retrieves configuration func (m *MockPlugin) GetConfig(ctx context.Context, conn grpc.ClientConnInterface) (*plugin_pb.PluginConfig, error) { m.mu.Lock() m.GetConfigCalls++ defer m.mu.Unlock() if m.SimulateError && m.SimulateErrorType == "getconfig" { return nil, ErrSimulatedError } return m.Config, nil } // SubmitResult submits job results func (m *MockPlugin) SubmitResult(ctx context.Context, conn grpc.ClientConnInterface, jobID string, result *plugin_pb.JobResult) error { m.mu.Lock() m.SubmitResultCalls++ defer m.mu.Unlock() if m.SimulateError && m.SimulateErrorType == "submitresult" { return ErrSimulatedError } return nil } // GetActiveJobCount returns the number of active jobs func (m *MockPlugin) GetActiveJobCount() int { m.mu.RLock() defer m.mu.RUnlock() return len(m.ActiveJobs) } // GetCompletedJobCount returns the number of completed jobs func (m *MockPlugin) GetCompletedJobCount() int { m.mu.RLock() defer m.mu.RUnlock() return m.CompletedJobs } // GetFailedJobCount returns the number of failed jobs func (m *MockPlugin) GetFailedJobCount() int { m.mu.RLock() defer m.mu.RUnlock() return m.FailedJobs } // GetStreamCallCount returns the count of stream calls func (m *MockPlugin) GetStreamCallCount() int { m.mu.RLock() defer m.mu.RUnlock() return m.ExecuteJobStreamCalls } // GetHealthReportCount returns the count of health reports sent func (m *MockPlugin) GetHealthReportCount() int { m.mu.RLock() defer m.mu.RUnlock() return m.ReportHealthCalls } // EnableErrorSimulation enables error simulation func (m *MockPlugin) EnableErrorSimulation(errorType string) { m.mu.Lock() defer m.mu.Unlock() m.SimulateError = true m.SimulateErrorType = errorType } // DisableErrorSimulation disables error simulation func (m *MockPlugin) DisableErrorSimulation() { m.mu.Lock() defer m.mu.Unlock() m.SimulateError = false m.SimulateErrorType = "" } // Reset clears all counters and state func (m *MockPlugin) Reset() { m.mu.Lock() defer m.mu.Unlock() m.ActiveJobs = make(map[string]*MockJobExecution) m.CompletedJobs = 0 m.FailedJobs = 0 m.ConnectStreamCalls = 0 m.ExecuteJobStreamCalls = 0 m.ReportHealthCalls = 0 m.GetConfigCalls = 0 m.SubmitResultCalls = 0 m.ReceivedJobs = make([]*plugin_pb.ExecuteJobRequest, 0) m.ReceivedHealthReports = make([]*plugin_pb.HealthReport, 0) m.LastError = "" m.SimulateError = false m.SimulateErrorType = "" } // GetJobExecution returns execution details for a job func (m *MockPlugin) GetJobExecution(jobID string) *MockJobExecution { m.mu.RLock() defer m.mu.RUnlock() return m.ActiveJobs[jobID] } // TrackJob records a received job func (m *MockPlugin) TrackJob(req *plugin_pb.ExecuteJobRequest) { m.mu.Lock() defer m.mu.Unlock() m.ReceivedJobs = append(m.ReceivedJobs, req) } // GetReceivedJobCount returns the count of received jobs func (m *MockPlugin) GetReceivedJobCount() int { m.mu.RLock() defer m.mu.RUnlock() return len(m.ReceivedJobs) } // SimulateStreamError simulates an error during streaming func (m *MockPlugin) SimulateStreamError(reason error) { m.mu.Lock() defer m.mu.Unlock() m.LastError = reason.Error() } // SetStatus sets the plugin status func (m *MockPlugin) SetStatus(status string) { m.mu.Lock() defer m.mu.Unlock() m.Status = status } // GetStatus returns the plugin status func (m *MockPlugin) GetStatus() string { m.mu.RLock() defer m.mu.RUnlock() return m.Status }