mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-17 07:11:30 +00:00
450 lines
12 KiB
Go
450 lines
12 KiB
Go
package testing
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
|
)
|
|
|
|
// MockPlugin simulates a plugin worker instance for testing
|
|
type MockPlugin struct {
|
|
mu sync.RWMutex
|
|
ID string
|
|
Name string
|
|
Version string
|
|
Status string
|
|
Capabilities []string
|
|
CapabilitiesDetail *plugin_pb.PluginCapabilities
|
|
MaxConcurrentJobs int
|
|
Config *plugin_pb.PluginConfig
|
|
ActiveJobs map[string]*MockJobExecution
|
|
CompletedJobs int
|
|
FailedJobs int
|
|
ConnectStreamCalls int
|
|
ExecuteJobStreamCalls int
|
|
ReportHealthCalls int
|
|
GetConfigCalls int
|
|
SubmitResultCalls int
|
|
ReceivedJobs []*plugin_pb.ExecuteJobRequest
|
|
ReceivedHealthReports []*plugin_pb.HealthReport
|
|
LastError string
|
|
SimulateError bool
|
|
SimulateErrorType string
|
|
SchemaData []byte
|
|
DetectionResults []*DetectionResult
|
|
ExecutionResults []*ExecutionResult
|
|
}
|
|
|
|
// MockJobExecution tracks job execution state
|
|
type MockJobExecution struct {
|
|
JobID string
|
|
Type string
|
|
Status plugin_pb.ExecutionStatus
|
|
Progress float32
|
|
CurrentStep string
|
|
StartTime time.Time
|
|
EndTime *time.Time
|
|
Result *plugin_pb.JobResult
|
|
ErrorMessage string
|
|
}
|
|
|
|
// DetectionResult represents detection results
|
|
type DetectionResult struct {
|
|
ResourceID string
|
|
DetectionType string
|
|
Severity string
|
|
Description string
|
|
Data []byte
|
|
}
|
|
|
|
// ExecutionResult represents execution results
|
|
type ExecutionResult struct {
|
|
ResourceID string
|
|
Success bool
|
|
ErrorMessage string
|
|
Data []byte
|
|
}
|
|
|
|
// NewMockPlugin creates a new mock plugin
|
|
func NewMockPlugin(id, name, version string) *MockPlugin {
|
|
return &MockPlugin{
|
|
ID: id,
|
|
Name: name,
|
|
Version: version,
|
|
Status: "ready",
|
|
Capabilities: make([]string, 0),
|
|
CapabilitiesDetail: &plugin_pb.PluginCapabilities{},
|
|
MaxConcurrentJobs: 5,
|
|
Config: &plugin_pb.PluginConfig{},
|
|
ActiveJobs: make(map[string]*MockJobExecution),
|
|
ReceivedJobs: make([]*plugin_pb.ExecuteJobRequest, 0),
|
|
ReceivedHealthReports: make([]*plugin_pb.HealthReport, 0),
|
|
DetectionResults: make([]*DetectionResult, 0),
|
|
ExecutionResults: make([]*ExecutionResult, 0),
|
|
}
|
|
}
|
|
|
|
// AddCapability adds a capability to the plugin
|
|
func (m *MockPlugin) AddCapability(cap string) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.Capabilities = append(m.Capabilities, cap)
|
|
}
|
|
|
|
// AddDetectionCapability adds a detection capability
|
|
func (m *MockPlugin) AddDetectionCapability(typ, desc string, minInterval int32, requiresFullScan bool) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
if m.CapabilitiesDetail == nil {
|
|
m.CapabilitiesDetail = &plugin_pb.PluginCapabilities{}
|
|
}
|
|
m.CapabilitiesDetail.Detection = append(m.CapabilitiesDetail.Detection, &plugin_pb.DetectionCapability{
|
|
Type: typ,
|
|
Description: desc,
|
|
MinIntervalSeconds: minInterval,
|
|
RequiresFullScan: requiresFullScan,
|
|
})
|
|
m.Capabilities = append(m.Capabilities, typ)
|
|
}
|
|
|
|
// AddMaintenanceCapability adds a maintenance capability
|
|
func (m *MockPlugin) AddMaintenanceCapability(typ, desc string, requiredDetections []string) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
if m.CapabilitiesDetail == nil {
|
|
m.CapabilitiesDetail = &plugin_pb.PluginCapabilities{}
|
|
}
|
|
m.CapabilitiesDetail.Maintenance = append(m.CapabilitiesDetail.Maintenance, &plugin_pb.MaintenanceCapability{
|
|
Type: typ,
|
|
Description: desc,
|
|
RequiredDetectionTypes: requiredDetections,
|
|
})
|
|
}
|
|
|
|
// SetSchema sets the schema data
|
|
func (m *MockPlugin) SetSchema(data []byte) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.SchemaData = data
|
|
}
|
|
|
|
// AddDetectionResult adds a detection result
|
|
func (m *MockPlugin) AddDetectionResult(resourceID, detectionType, severity, description string, data []byte) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.DetectionResults = append(m.DetectionResults, &DetectionResult{
|
|
ResourceID: resourceID,
|
|
DetectionType: detectionType,
|
|
Severity: severity,
|
|
Description: description,
|
|
Data: data,
|
|
})
|
|
}
|
|
|
|
// AddExecutionResult adds an execution result
|
|
func (m *MockPlugin) AddExecutionResult(resourceID string, success bool, errorMsg string, data []byte) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.ExecutionResults = append(m.ExecutionResults, &ExecutionResult{
|
|
ResourceID: resourceID,
|
|
Success: success,
|
|
ErrorMessage: errorMsg,
|
|
Data: data,
|
|
})
|
|
}
|
|
|
|
// GetConfigurationSchema implements schema retrieval
|
|
func (m *MockPlugin) GetConfigurationSchema(ctx context.Context) ([]byte, error) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
if m.SimulateError && m.SimulateErrorType == "schema" {
|
|
return nil, ErrSimulatedError
|
|
}
|
|
|
|
return m.SchemaData, nil
|
|
}
|
|
|
|
// DetectJobs implements detection logic
|
|
func (m *MockPlugin) DetectJobs(ctx context.Context) ([]*DetectionResult, error) {
|
|
m.mu.Lock()
|
|
m.ReportHealthCalls++
|
|
results := make([]*DetectionResult, len(m.DetectionResults))
|
|
copy(results, m.DetectionResults)
|
|
m.mu.Unlock()
|
|
|
|
if m.SimulateError && m.SimulateErrorType == "detect" {
|
|
return nil, ErrSimulatedError
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
// ExecuteJob implements job execution
|
|
func (m *MockPlugin) ExecuteJob(ctx context.Context, jobID string, jobType string, payload *plugin_pb.JobPayload) (*ExecutionResult, error) {
|
|
m.mu.Lock()
|
|
m.ExecuteJobStreamCalls++
|
|
|
|
execution := &MockJobExecution{
|
|
JobID: jobID,
|
|
Type: jobType,
|
|
Status: plugin_pb.ExecutionStatus_EXECUTION_STATUS_RUNNING,
|
|
StartTime: time.Now(),
|
|
Progress: 0,
|
|
CurrentStep: "initialized",
|
|
}
|
|
m.ActiveJobs[jobID] = execution
|
|
m.mu.Unlock()
|
|
|
|
// Simulate execution steps
|
|
steps := []string{"initialized", "validating", "processing", "finalizing"}
|
|
|
|
for i, step := range steps {
|
|
select {
|
|
case <-ctx.Done():
|
|
m.mu.Lock()
|
|
execution.Status = plugin_pb.ExecutionStatus_EXECUTION_STATUS_CANCELLED
|
|
execution.ErrorMessage = "context cancelled"
|
|
delete(m.ActiveJobs, jobID)
|
|
m.mu.Unlock()
|
|
return nil, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
m.mu.Lock()
|
|
execution.CurrentStep = step
|
|
execution.Progress = float32((i + 1) * 25)
|
|
m.mu.Unlock()
|
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
if m.SimulateError && m.SimulateErrorType == "execute" {
|
|
execution.Status = plugin_pb.ExecutionStatus_EXECUTION_STATUS_FAILED
|
|
execution.ErrorMessage = "simulated execution error"
|
|
m.FailedJobs++
|
|
delete(m.ActiveJobs, jobID)
|
|
return nil, ErrSimulatedError
|
|
}
|
|
|
|
// Get results
|
|
result := &ExecutionResult{
|
|
ResourceID: jobID,
|
|
Success: true,
|
|
ErrorMessage: "",
|
|
}
|
|
|
|
if len(m.ExecutionResults) > 0 {
|
|
result = m.ExecutionResults[0]
|
|
m.ExecutionResults = m.ExecutionResults[1:]
|
|
}
|
|
|
|
execution.Status = plugin_pb.ExecutionStatus_EXECUTION_STATUS_COMPLETED
|
|
execution.Progress = 100
|
|
execution.CurrentStep = "completed"
|
|
now := time.Now()
|
|
execution.EndTime = &now
|
|
|
|
m.CompletedJobs++
|
|
delete(m.ActiveJobs, jobID)
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// ConnectStream simulates the Connect RPC stream
|
|
func (m *MockPlugin) ConnectStream(ctx context.Context, conn grpc.ClientConnInterface) error {
|
|
m.mu.Lock()
|
|
m.ConnectStreamCalls++
|
|
m.mu.Unlock()
|
|
|
|
if m.SimulateError && m.SimulateErrorType == "connect" {
|
|
return ErrSimulatedError
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ExecuteJobStream simulates the ExecuteJob RPC stream
|
|
func (m *MockPlugin) ExecuteJobStream(ctx context.Context, conn grpc.ClientConnInterface, jobID string) error {
|
|
m.mu.Lock()
|
|
m.ExecuteJobStreamCalls++
|
|
m.mu.Unlock()
|
|
|
|
if m.SimulateError && m.SimulateErrorType == "executestream" {
|
|
return ErrSimulatedError
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ReportHealth sends a health report
|
|
func (m *MockPlugin) ReportHealth(ctx context.Context, conn grpc.ClientConnInterface) error {
|
|
m.mu.Lock()
|
|
m.ReportHealthCalls++
|
|
|
|
activeCount := len(m.ActiveJobs)
|
|
m.mu.Unlock()
|
|
|
|
if m.SimulateError && m.SimulateErrorType == "health" {
|
|
return ErrSimulatedError
|
|
}
|
|
|
|
report := &plugin_pb.HealthReport{
|
|
PluginId: m.ID,
|
|
TimestampMs: time.Now().UnixMilli(),
|
|
Status: plugin_pb.HealthStatus_HEALTH_STATUS_HEALTHY,
|
|
ActiveJobs: int32(activeCount),
|
|
}
|
|
|
|
m.mu.Lock()
|
|
m.ReceivedHealthReports = append(m.ReceivedHealthReports, report)
|
|
m.mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetConfig retrieves configuration
|
|
func (m *MockPlugin) GetConfig(ctx context.Context, conn grpc.ClientConnInterface) (*plugin_pb.PluginConfig, error) {
|
|
m.mu.Lock()
|
|
m.GetConfigCalls++
|
|
defer m.mu.Unlock()
|
|
|
|
if m.SimulateError && m.SimulateErrorType == "getconfig" {
|
|
return nil, ErrSimulatedError
|
|
}
|
|
|
|
return m.Config, nil
|
|
}
|
|
|
|
// SubmitResult submits job results
|
|
func (m *MockPlugin) SubmitResult(ctx context.Context, conn grpc.ClientConnInterface, jobID string, result *plugin_pb.JobResult) error {
|
|
m.mu.Lock()
|
|
m.SubmitResultCalls++
|
|
defer m.mu.Unlock()
|
|
|
|
if m.SimulateError && m.SimulateErrorType == "submitresult" {
|
|
return ErrSimulatedError
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetActiveJobCount returns the number of active jobs
|
|
func (m *MockPlugin) GetActiveJobCount() int {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
return len(m.ActiveJobs)
|
|
}
|
|
|
|
// GetCompletedJobCount returns the number of completed jobs
|
|
func (m *MockPlugin) GetCompletedJobCount() int {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
return m.CompletedJobs
|
|
}
|
|
|
|
// GetFailedJobCount returns the number of failed jobs
|
|
func (m *MockPlugin) GetFailedJobCount() int {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
return m.FailedJobs
|
|
}
|
|
|
|
// GetStreamCallCount returns the count of stream calls
|
|
func (m *MockPlugin) GetStreamCallCount() int {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
return m.ExecuteJobStreamCalls
|
|
}
|
|
|
|
// GetHealthReportCount returns the count of health reports sent
|
|
func (m *MockPlugin) GetHealthReportCount() int {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
return m.ReportHealthCalls
|
|
}
|
|
|
|
// EnableErrorSimulation enables error simulation
|
|
func (m *MockPlugin) EnableErrorSimulation(errorType string) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.SimulateError = true
|
|
m.SimulateErrorType = errorType
|
|
}
|
|
|
|
// DisableErrorSimulation disables error simulation
|
|
func (m *MockPlugin) DisableErrorSimulation() {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.SimulateError = false
|
|
m.SimulateErrorType = ""
|
|
}
|
|
|
|
// Reset clears all counters and state
|
|
func (m *MockPlugin) Reset() {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.ActiveJobs = make(map[string]*MockJobExecution)
|
|
m.CompletedJobs = 0
|
|
m.FailedJobs = 0
|
|
m.ConnectStreamCalls = 0
|
|
m.ExecuteJobStreamCalls = 0
|
|
m.ReportHealthCalls = 0
|
|
m.GetConfigCalls = 0
|
|
m.SubmitResultCalls = 0
|
|
m.ReceivedJobs = make([]*plugin_pb.ExecuteJobRequest, 0)
|
|
m.ReceivedHealthReports = make([]*plugin_pb.HealthReport, 0)
|
|
m.LastError = ""
|
|
m.SimulateError = false
|
|
m.SimulateErrorType = ""
|
|
}
|
|
|
|
// GetJobExecution returns execution details for a job
|
|
func (m *MockPlugin) GetJobExecution(jobID string) *MockJobExecution {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
return m.ActiveJobs[jobID]
|
|
}
|
|
|
|
// TrackJob records a received job
|
|
func (m *MockPlugin) TrackJob(req *plugin_pb.ExecuteJobRequest) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.ReceivedJobs = append(m.ReceivedJobs, req)
|
|
}
|
|
|
|
// GetReceivedJobCount returns the count of received jobs
|
|
func (m *MockPlugin) GetReceivedJobCount() int {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
return len(m.ReceivedJobs)
|
|
}
|
|
|
|
// SimulateStreamError simulates an error during streaming
|
|
func (m *MockPlugin) SimulateStreamError(reason error) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.LastError = reason.Error()
|
|
}
|
|
|
|
// SetStatus sets the plugin status
|
|
func (m *MockPlugin) SetStatus(status string) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.Status = status
|
|
}
|
|
|
|
// GetStatus returns the plugin status
|
|
func (m *MockPlugin) GetStatus() string {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
return m.Status
|
|
}
|