From f57992b81e9cd93a70e2ca135bdebab06294a79e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 17 Feb 2026 01:44:52 -0800 Subject: [PATCH] feat(plugin): Add test harness for plugin testing --- weed/admin/plugin/testing/harness.go | 525 +++++++++++++++++++++++++++ 1 file changed, 525 insertions(+) create mode 100644 weed/admin/plugin/testing/harness.go diff --git a/weed/admin/plugin/testing/harness.go b/weed/admin/plugin/testing/harness.go new file mode 100644 index 000000000..7cff50b30 --- /dev/null +++ b/weed/admin/plugin/testing/harness.go @@ -0,0 +1,525 @@ +package testing + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" +) + +// TestHarness provides a complete testing environment for plugins +type TestHarness struct { + mu sync.RWMutex + adminService *MockPluginService + plugins map[string]*MockPlugin + registrations map[string]*RegistrationRecord + jobs map[string]*JobTracker + detections map[string][]*DetectionRecord + executions map[string]*ExecutionRecord + startTime time.Time + timeout time.Duration + testName string + registrationWait time.Duration + executionWait time.Duration + expectedPlugins int + registeredPlugins int + failureReasons []string +} + +// RegistrationRecord tracks plugin registration details +type RegistrationRecord struct { + PluginID string + RegisteredAt time.Time + Version string + Capabilities []string + MaxConcurrentJobs int + Status string +} + +// JobTracker tracks job lifecycle +type JobTracker struct { + JobID string + Type string + PluginID string + Status plugin_pb.ExecutionStatus + CreatedAt time.Time + StartedAt *time.Time + CompletedAt *time.Time + Result *plugin_pb.JobResult + ErrorMessage string + Detections []*DetectionRecord +} + +// ExecutionRecord tracks execution details +type ExecutionRecord struct { + ResourceID string + Type string + ExecutedAt time.Time + CompletedAt *time.Time + Success bool + ErrorMessage string + Data []byte +} + +// NewTestHarness creates a new test harness +func NewTestHarness(testName string) *TestHarness { + return &TestHarness{ + testName: testName, + adminService: NewMockPluginService(), + plugins: make(map[string]*MockPlugin), + registrations: make(map[string]*RegistrationRecord), + jobs: make(map[string]*JobTracker), + detections: make(map[string][]*DetectionRecord), + executions: make(map[string]*ExecutionRecord), + startTime: time.Now(), + timeout: 10 * time.Second, + registrationWait: 100 * time.Millisecond, + executionWait: 100 * time.Millisecond, + failureReasons: make([]string, 0), + } +} + +// SetTimeout sets the overall test timeout +func (h *TestHarness) SetTimeout(timeout time.Duration) { + h.mu.Lock() + defer h.mu.Unlock() + h.timeout = timeout +} + +// SetRegistrationWait sets the wait time for plugin registration +func (h *TestHarness) SetRegistrationWait(duration time.Duration) { + h.mu.Lock() + defer h.mu.Unlock() + h.registrationWait = duration +} + +// SetExecutionWait sets the wait time for job execution +func (h *TestHarness) SetExecutionWait(duration time.Duration) { + h.mu.Lock() + defer h.mu.Unlock() + h.executionWait = duration +} + +// RegisterPlugin simulates plugin registration +func (h *TestHarness) RegisterPlugin(plugin *MockPlugin) error { + h.mu.Lock() + + if plugin == nil { + h.failureReasons = append(h.failureReasons, "plugin is nil") + h.mu.Unlock() + return fmt.Errorf("plugin is nil") + } + + h.plugins[plugin.ID] = plugin + h.mu.Unlock() + + // Simulate registration with admin service + req := &plugin_pb.PluginConnectRequest{ + PluginId: plugin.ID, + PluginName: plugin.Name, + Version: plugin.Version, + Capabilities: plugin.Capabilities, + CapabilitiesDetail: plugin.CapabilitiesDetail, + MaxConcurrentJobs: int32(plugin.MaxConcurrentJobs), + } + + ctx, cancel := context.WithTimeout(context.Background(), h.timeout) + defer cancel() + + resp, err := h.adminService.Connect(ctx, req) + if err != nil { + h.mu.Lock() + h.failureReasons = append(h.failureReasons, fmt.Sprintf("registration failed: %v", err)) + h.mu.Unlock() + return err + } + + if !resp.Success { + h.mu.Lock() + h.failureReasons = append(h.failureReasons, "registration response was not successful") + h.mu.Unlock() + return fmt.Errorf("registration failed: %s", resp.Message) + } + + h.mu.Lock() + h.registrations[plugin.ID] = &RegistrationRecord{ + PluginID: plugin.ID, + RegisteredAt: time.Now(), + Version: plugin.Version, + Capabilities: plugin.Capabilities, + MaxConcurrentJobs: plugin.MaxConcurrentJobs, + Status: "registered", + } + h.registeredPlugins++ + h.mu.Unlock() + + return nil +} + +// RegisterMultiplePlugins registers multiple plugins +func (h *TestHarness) RegisterMultiplePlugins(plugins ...*MockPlugin) error { + for _, plugin := range plugins { + if err := h.RegisterPlugin(plugin); err != nil { + return err + } + } + return nil +} + +// ExpectPlugins sets the expected number of plugins +func (h *TestHarness) ExpectPlugins(count int) { + h.mu.Lock() + defer h.mu.Unlock() + h.expectedPlugins = count +} + +// DispatchJob sends a job to a plugin +func (h *TestHarness) DispatchJob(pluginID string, jobType string, payload *plugin_pb.JobPayload) (string, error) { + h.mu.RLock() + plugin, ok := h.plugins[pluginID] + h.mu.RUnlock() + + if !ok { + return "", fmt.Errorf("plugin not found: %s", pluginID) + } + + jobID := fmt.Sprintf("job-%d-%d", len(h.jobs), time.Now().UnixNano()) + + req := &plugin_pb.ExecuteJobRequest{ + JobId: jobID, + JobType: jobType, + Payload: payload, + } + + ctx, cancel := context.WithTimeout(context.Background(), h.timeout) + defer cancel() + + // Create mock stream + mockStream := &MockExecuteJobStream{ + responses: make([]*plugin_pb.ExecuteJobResponse, 0), + } + + err := h.adminService.ExecuteJob(req, mockStream) + if err != nil { + h.mu.Lock() + h.failureReasons = append(h.failureReasons, fmt.Sprintf("job dispatch failed: %v", err)) + h.mu.Unlock() + return "", err + } + + // Wait for job to complete + time.Sleep(h.executionWait) + + // Verify job execution + plugin.TrackJob(req) + + executionErr := plugin.ExecuteJob(ctx, jobID, jobType, payload) + + h.mu.Lock() + h.jobs[jobID] = &JobTracker{ + JobID: jobID, + Type: jobType, + PluginID: pluginID, + Status: plugin_pb.ExecutionStatus_EXECUTION_STATUS_RUNNING, + CreatedAt: time.Now(), + } + h.mu.Unlock() + + // Simulate completion after a small delay + time.Sleep(h.executionWait) + + h.mu.Lock() + h.jobs[jobID].Status = plugin_pb.ExecutionStatus_EXECUTION_STATUS_COMPLETED + now := time.Now() + h.jobs[jobID].CompletedAt = &now + h.mu.Unlock() + + if executionErr != nil { + h.mu.Lock() + h.jobs[jobID].Status = plugin_pb.ExecutionStatus_EXECUTION_STATUS_FAILED + h.jobs[jobID].ErrorMessage = executionErr.Error() + h.mu.Unlock() + } + + return jobID, nil +} + +// VerifyRegistration checks if a plugin was registered +func (h *TestHarness) VerifyRegistration(pluginID string) bool { + h.mu.RLock() + defer h.mu.RUnlock() + _, ok := h.registrations[pluginID] + return ok +} + +// VerifyJobCompleted checks if a job completed successfully +func (h *TestHarness) VerifyJobCompleted(jobID string) bool { + h.mu.RLock() + defer h.mu.RUnlock() + + job, ok := h.jobs[jobID] + if !ok { + return false + } + + return job.Status == plugin_pb.ExecutionStatus_EXECUTION_STATUS_COMPLETED +} + +// VerifyJobFailed checks if a job failed +func (h *TestHarness) VerifyJobFailed(jobID string) bool { + h.mu.RLock() + defer h.mu.RUnlock() + + job, ok := h.jobs[jobID] + if !ok { + return false + } + + return job.Status == plugin_pb.ExecutionStatus_EXECUTION_STATUS_FAILED +} + +// VerifyPluginCapability checks if a plugin has a capability +func (h *TestHarness) VerifyPluginCapability(pluginID string, capability string) bool { + h.mu.RLock() + defer h.mu.RUnlock() + + reg, ok := h.registrations[pluginID] + if !ok { + return false + } + + for _, cap := range reg.Capabilities { + if cap == capability { + return true + } + } + + return false +} + +// GetJobStatus returns the status of a job +func (h *TestHarness) GetJobStatus(jobID string) plugin_pb.ExecutionStatus { + h.mu.RLock() + defer h.mu.RUnlock() + + job, ok := h.jobs[jobID] + if !ok { + return plugin_pb.ExecutionStatus_EXECUTION_STATUS_UNKNOWN + } + + return job.Status +} + +// GetPlugin returns a registered plugin +func (h *TestHarness) GetPlugin(pluginID string) *MockPlugin { + h.mu.RLock() + defer h.mu.RUnlock() + return h.plugins[pluginID] +} + +// GetRegistrationCount returns the number of registered plugins +func (h *TestHarness) GetRegistrationCount() int { + h.mu.RLock() + defer h.mu.RUnlock() + return h.registeredPlugins +} + +// GetJobCount returns the total number of jobs dispatched +func (h *TestHarness) GetJobCount() int { + h.mu.RLock() + defer h.mu.RUnlock() + return len(h.jobs) +} + +// SimulateDetection simulates detection results +func (h *TestHarness) SimulateDetection(pluginID string, result *DetectionResult) error { + h.mu.RLock() + plugin, ok := h.plugins[pluginID] + h.mu.RUnlock() + + if !ok { + return fmt.Errorf("plugin not found: %s", pluginID) + } + + plugin.AddDetectionResult(result.ResourceID, result.DetectionType, result.Severity, result.Description, result.Data) + + h.mu.Lock() + if _, exists := h.detections[pluginID]; !exists { + h.detections[pluginID] = make([]*DetectionRecord, 0) + } + h.detections[pluginID] = append(h.detections[pluginID], result) + h.mu.Unlock() + + return nil +} + +// GetAdminService returns the underlying admin service +func (h *TestHarness) GetAdminService() *MockPluginService { + h.mu.RLock() + defer h.mu.RUnlock() + return h.adminService +} + +// GetTestDuration returns the elapsed test time +func (h *TestHarness) GetTestDuration() time.Duration { + h.mu.RLock() + defer h.mu.RUnlock() + return time.Since(h.startTime) +} + +// ReportFailure records a test failure reason +func (h *TestHarness) ReportFailure(reason string) { + h.mu.Lock() + defer h.mu.Unlock() + h.failureReasons = append(h.failureReasons, reason) +} + +// HasFailures checks if any failures were recorded +func (h *TestHarness) HasFailures() bool { + h.mu.RLock() + defer h.mu.RUnlock() + return len(h.failureReasons) > 0 +} + +// GetFailures returns all recorded failures +func (h *TestHarness) GetFailures() []string { + h.mu.RLock() + defer h.mu.RUnlock() + failures := make([]string, len(h.failureReasons)) + copy(failures, h.failureReasons) + return failures +} + +// WaitForRegistration waits for a specific number of plugins to register +func (h *TestHarness) WaitForRegistration(count int, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + for { + h.mu.RLock() + current := h.registeredPlugins + h.mu.RUnlock() + + if current >= count { + return true + } + + if time.Now().After(deadline) { + return false + } + + time.Sleep(10 * time.Millisecond) + } +} + +// VerifyAdminServiceStats checks admin service statistics +func (h *TestHarness) VerifyAdminServiceStats(regCount, jobCount int) bool { + return h.adminService.GetRegistrationCount() == regCount && + h.adminService.GetJobDispatchCount() == jobCount +} + +// GetCompletedJobCount returns the number of completed jobs +func (h *TestHarness) GetCompletedJobCount() int { + h.mu.RLock() + defer h.mu.RUnlock() + + count := 0 + for _, job := range h.jobs { + if job.Status == plugin_pb.ExecutionStatus_EXECUTION_STATUS_COMPLETED { + count++ + } + } + return count +} + +// GetFailedJobCount returns the number of failed jobs +func (h *TestHarness) GetFailedJobCount() int { + h.mu.RLock() + defer h.mu.RUnlock() + + count := 0 + for _, job := range h.jobs { + if job.Status == plugin_pb.ExecutionStatus_EXECUTION_STATUS_FAILED { + count++ + } + } + return count +} + +// Cleanup performs cleanup after a test +func (h *TestHarness) Cleanup() { + h.mu.Lock() + defer h.mu.Unlock() + + // Reset all plugins + for _, plugin := range h.plugins { + plugin.Reset() + } + + // Reset admin service + h.adminService.ResetCounters() + + // Clear tracking + h.registrations = make(map[string]*RegistrationRecord) + h.jobs = make(map[string]*JobTracker) + h.detections = make(map[string][]*DetectionRecord) + h.executions = make(map[string]*ExecutionRecord) + h.failureReasons = make([]string, 0) + h.registeredPlugins = 0 + h.startTime = time.Now() +} + +// MockExecuteJobStream is a mock implementation of the ExecuteJob stream +type MockExecuteJobStream struct { + responses []*plugin_pb.ExecuteJobResponse + mu sync.Mutex +} + +// Send sends a response on the stream +func (m *MockExecuteJobStream) Send(resp *plugin_pb.ExecuteJobResponse) error { + m.mu.Lock() + defer m.mu.Unlock() + m.responses = append(m.responses, resp) + return nil +} + +// Recv receives a response from the stream +func (m *MockExecuteJobStream) Recv() (*plugin_pb.ExecuteJobResponse, error) { + m.mu.Lock() + defer m.mu.Unlock() + if len(m.responses) == 0 { + return nil, fmt.Errorf("no responses") + } + resp := m.responses[0] + m.responses = m.responses[1:] + return resp, nil +} + +// SetHeader sets the metadata header +func (m *MockExecuteJobStream) SetHeader(map[string][]string) error { + return nil +} + +// SendHeader sends the metadata header +func (m *MockExecuteJobStream) SendHeader(map[string][]string) error { + return nil +} + +// SetTrailer sets the metadata trailer +func (m *MockExecuteJobStream) SetTrailer(map[string][]string) { +} + +// Context returns the context +func (m *MockExecuteJobStream) Context() context.Context { + return context.Background() +} + +// SendMsg sends a message on the stream +func (m *MockExecuteJobStream) SendMsg(interface{}) error { + return nil +} + +// RecvMsg receives a message from the stream +func (m *MockExecuteJobStream) RecvMsg(interface{}) error { + return nil +}