mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-21 17:21:34 +00:00
feat(plugin): Add test harness for plugin testing
This commit is contained in:
525
weed/admin/plugin/testing/harness.go
Normal file
525
weed/admin/plugin/testing/harness.go
Normal file
@@ -0,0 +1,525 @@
|
||||
package testing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
||||
)
|
||||
|
||||
// TestHarness provides a complete testing environment for plugins
|
||||
type TestHarness struct {
|
||||
mu sync.RWMutex
|
||||
adminService *MockPluginService
|
||||
plugins map[string]*MockPlugin
|
||||
registrations map[string]*RegistrationRecord
|
||||
jobs map[string]*JobTracker
|
||||
detections map[string][]*DetectionRecord
|
||||
executions map[string]*ExecutionRecord
|
||||
startTime time.Time
|
||||
timeout time.Duration
|
||||
testName string
|
||||
registrationWait time.Duration
|
||||
executionWait time.Duration
|
||||
expectedPlugins int
|
||||
registeredPlugins int
|
||||
failureReasons []string
|
||||
}
|
||||
|
||||
// RegistrationRecord tracks plugin registration details
|
||||
type RegistrationRecord struct {
|
||||
PluginID string
|
||||
RegisteredAt time.Time
|
||||
Version string
|
||||
Capabilities []string
|
||||
MaxConcurrentJobs int
|
||||
Status string
|
||||
}
|
||||
|
||||
// JobTracker tracks job lifecycle
|
||||
type JobTracker struct {
|
||||
JobID string
|
||||
Type string
|
||||
PluginID string
|
||||
Status plugin_pb.ExecutionStatus
|
||||
CreatedAt time.Time
|
||||
StartedAt *time.Time
|
||||
CompletedAt *time.Time
|
||||
Result *plugin_pb.JobResult
|
||||
ErrorMessage string
|
||||
Detections []*DetectionRecord
|
||||
}
|
||||
|
||||
// ExecutionRecord tracks execution details
|
||||
type ExecutionRecord struct {
|
||||
ResourceID string
|
||||
Type string
|
||||
ExecutedAt time.Time
|
||||
CompletedAt *time.Time
|
||||
Success bool
|
||||
ErrorMessage string
|
||||
Data []byte
|
||||
}
|
||||
|
||||
// NewTestHarness creates a new test harness
|
||||
func NewTestHarness(testName string) *TestHarness {
|
||||
return &TestHarness{
|
||||
testName: testName,
|
||||
adminService: NewMockPluginService(),
|
||||
plugins: make(map[string]*MockPlugin),
|
||||
registrations: make(map[string]*RegistrationRecord),
|
||||
jobs: make(map[string]*JobTracker),
|
||||
detections: make(map[string][]*DetectionRecord),
|
||||
executions: make(map[string]*ExecutionRecord),
|
||||
startTime: time.Now(),
|
||||
timeout: 10 * time.Second,
|
||||
registrationWait: 100 * time.Millisecond,
|
||||
executionWait: 100 * time.Millisecond,
|
||||
failureReasons: make([]string, 0),
|
||||
}
|
||||
}
|
||||
|
||||
// SetTimeout sets the overall test timeout
|
||||
func (h *TestHarness) SetTimeout(timeout time.Duration) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.timeout = timeout
|
||||
}
|
||||
|
||||
// SetRegistrationWait sets the wait time for plugin registration
|
||||
func (h *TestHarness) SetRegistrationWait(duration time.Duration) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.registrationWait = duration
|
||||
}
|
||||
|
||||
// SetExecutionWait sets the wait time for job execution
|
||||
func (h *TestHarness) SetExecutionWait(duration time.Duration) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.executionWait = duration
|
||||
}
|
||||
|
||||
// RegisterPlugin simulates plugin registration
|
||||
func (h *TestHarness) RegisterPlugin(plugin *MockPlugin) error {
|
||||
h.mu.Lock()
|
||||
|
||||
if plugin == nil {
|
||||
h.failureReasons = append(h.failureReasons, "plugin is nil")
|
||||
h.mu.Unlock()
|
||||
return fmt.Errorf("plugin is nil")
|
||||
}
|
||||
|
||||
h.plugins[plugin.ID] = plugin
|
||||
h.mu.Unlock()
|
||||
|
||||
// Simulate registration with admin service
|
||||
req := &plugin_pb.PluginConnectRequest{
|
||||
PluginId: plugin.ID,
|
||||
PluginName: plugin.Name,
|
||||
Version: plugin.Version,
|
||||
Capabilities: plugin.Capabilities,
|
||||
CapabilitiesDetail: plugin.CapabilitiesDetail,
|
||||
MaxConcurrentJobs: int32(plugin.MaxConcurrentJobs),
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
|
||||
defer cancel()
|
||||
|
||||
resp, err := h.adminService.Connect(ctx, req)
|
||||
if err != nil {
|
||||
h.mu.Lock()
|
||||
h.failureReasons = append(h.failureReasons, fmt.Sprintf("registration failed: %v", err))
|
||||
h.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
if !resp.Success {
|
||||
h.mu.Lock()
|
||||
h.failureReasons = append(h.failureReasons, "registration response was not successful")
|
||||
h.mu.Unlock()
|
||||
return fmt.Errorf("registration failed: %s", resp.Message)
|
||||
}
|
||||
|
||||
h.mu.Lock()
|
||||
h.registrations[plugin.ID] = &RegistrationRecord{
|
||||
PluginID: plugin.ID,
|
||||
RegisteredAt: time.Now(),
|
||||
Version: plugin.Version,
|
||||
Capabilities: plugin.Capabilities,
|
||||
MaxConcurrentJobs: plugin.MaxConcurrentJobs,
|
||||
Status: "registered",
|
||||
}
|
||||
h.registeredPlugins++
|
||||
h.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RegisterMultiplePlugins registers multiple plugins
|
||||
func (h *TestHarness) RegisterMultiplePlugins(plugins ...*MockPlugin) error {
|
||||
for _, plugin := range plugins {
|
||||
if err := h.RegisterPlugin(plugin); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExpectPlugins sets the expected number of plugins
|
||||
func (h *TestHarness) ExpectPlugins(count int) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.expectedPlugins = count
|
||||
}
|
||||
|
||||
// DispatchJob sends a job to a plugin
|
||||
func (h *TestHarness) DispatchJob(pluginID string, jobType string, payload *plugin_pb.JobPayload) (string, error) {
|
||||
h.mu.RLock()
|
||||
plugin, ok := h.plugins[pluginID]
|
||||
h.mu.RUnlock()
|
||||
|
||||
if !ok {
|
||||
return "", fmt.Errorf("plugin not found: %s", pluginID)
|
||||
}
|
||||
|
||||
jobID := fmt.Sprintf("job-%d-%d", len(h.jobs), time.Now().UnixNano())
|
||||
|
||||
req := &plugin_pb.ExecuteJobRequest{
|
||||
JobId: jobID,
|
||||
JobType: jobType,
|
||||
Payload: payload,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
|
||||
defer cancel()
|
||||
|
||||
// Create mock stream
|
||||
mockStream := &MockExecuteJobStream{
|
||||
responses: make([]*plugin_pb.ExecuteJobResponse, 0),
|
||||
}
|
||||
|
||||
err := h.adminService.ExecuteJob(req, mockStream)
|
||||
if err != nil {
|
||||
h.mu.Lock()
|
||||
h.failureReasons = append(h.failureReasons, fmt.Sprintf("job dispatch failed: %v", err))
|
||||
h.mu.Unlock()
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Wait for job to complete
|
||||
time.Sleep(h.executionWait)
|
||||
|
||||
// Verify job execution
|
||||
plugin.TrackJob(req)
|
||||
|
||||
executionErr := plugin.ExecuteJob(ctx, jobID, jobType, payload)
|
||||
|
||||
h.mu.Lock()
|
||||
h.jobs[jobID] = &JobTracker{
|
||||
JobID: jobID,
|
||||
Type: jobType,
|
||||
PluginID: pluginID,
|
||||
Status: plugin_pb.ExecutionStatus_EXECUTION_STATUS_RUNNING,
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
h.mu.Unlock()
|
||||
|
||||
// Simulate completion after a small delay
|
||||
time.Sleep(h.executionWait)
|
||||
|
||||
h.mu.Lock()
|
||||
h.jobs[jobID].Status = plugin_pb.ExecutionStatus_EXECUTION_STATUS_COMPLETED
|
||||
now := time.Now()
|
||||
h.jobs[jobID].CompletedAt = &now
|
||||
h.mu.Unlock()
|
||||
|
||||
if executionErr != nil {
|
||||
h.mu.Lock()
|
||||
h.jobs[jobID].Status = plugin_pb.ExecutionStatus_EXECUTION_STATUS_FAILED
|
||||
h.jobs[jobID].ErrorMessage = executionErr.Error()
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
return jobID, nil
|
||||
}
|
||||
|
||||
// VerifyRegistration checks if a plugin was registered
|
||||
func (h *TestHarness) VerifyRegistration(pluginID string) bool {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
_, ok := h.registrations[pluginID]
|
||||
return ok
|
||||
}
|
||||
|
||||
// VerifyJobCompleted checks if a job completed successfully
|
||||
func (h *TestHarness) VerifyJobCompleted(jobID string) bool {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
|
||||
job, ok := h.jobs[jobID]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
return job.Status == plugin_pb.ExecutionStatus_EXECUTION_STATUS_COMPLETED
|
||||
}
|
||||
|
||||
// VerifyJobFailed checks if a job failed
|
||||
func (h *TestHarness) VerifyJobFailed(jobID string) bool {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
|
||||
job, ok := h.jobs[jobID]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
return job.Status == plugin_pb.ExecutionStatus_EXECUTION_STATUS_FAILED
|
||||
}
|
||||
|
||||
// VerifyPluginCapability checks if a plugin has a capability
|
||||
func (h *TestHarness) VerifyPluginCapability(pluginID string, capability string) bool {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
|
||||
reg, ok := h.registrations[pluginID]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, cap := range reg.Capabilities {
|
||||
if cap == capability {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// GetJobStatus returns the status of a job
|
||||
func (h *TestHarness) GetJobStatus(jobID string) plugin_pb.ExecutionStatus {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
|
||||
job, ok := h.jobs[jobID]
|
||||
if !ok {
|
||||
return plugin_pb.ExecutionStatus_EXECUTION_STATUS_UNKNOWN
|
||||
}
|
||||
|
||||
return job.Status
|
||||
}
|
||||
|
||||
// GetPlugin returns a registered plugin
|
||||
func (h *TestHarness) GetPlugin(pluginID string) *MockPlugin {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return h.plugins[pluginID]
|
||||
}
|
||||
|
||||
// GetRegistrationCount returns the number of registered plugins
|
||||
func (h *TestHarness) GetRegistrationCount() int {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return h.registeredPlugins
|
||||
}
|
||||
|
||||
// GetJobCount returns the total number of jobs dispatched
|
||||
func (h *TestHarness) GetJobCount() int {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return len(h.jobs)
|
||||
}
|
||||
|
||||
// SimulateDetection simulates detection results
|
||||
func (h *TestHarness) SimulateDetection(pluginID string, result *DetectionResult) error {
|
||||
h.mu.RLock()
|
||||
plugin, ok := h.plugins[pluginID]
|
||||
h.mu.RUnlock()
|
||||
|
||||
if !ok {
|
||||
return fmt.Errorf("plugin not found: %s", pluginID)
|
||||
}
|
||||
|
||||
plugin.AddDetectionResult(result.ResourceID, result.DetectionType, result.Severity, result.Description, result.Data)
|
||||
|
||||
h.mu.Lock()
|
||||
if _, exists := h.detections[pluginID]; !exists {
|
||||
h.detections[pluginID] = make([]*DetectionRecord, 0)
|
||||
}
|
||||
h.detections[pluginID] = append(h.detections[pluginID], result)
|
||||
h.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetAdminService returns the underlying admin service
|
||||
func (h *TestHarness) GetAdminService() *MockPluginService {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return h.adminService
|
||||
}
|
||||
|
||||
// GetTestDuration returns the elapsed test time
|
||||
func (h *TestHarness) GetTestDuration() time.Duration {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return time.Since(h.startTime)
|
||||
}
|
||||
|
||||
// ReportFailure records a test failure reason
|
||||
func (h *TestHarness) ReportFailure(reason string) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.failureReasons = append(h.failureReasons, reason)
|
||||
}
|
||||
|
||||
// HasFailures checks if any failures were recorded
|
||||
func (h *TestHarness) HasFailures() bool {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return len(h.failureReasons) > 0
|
||||
}
|
||||
|
||||
// GetFailures returns all recorded failures
|
||||
func (h *TestHarness) GetFailures() []string {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
failures := make([]string, len(h.failureReasons))
|
||||
copy(failures, h.failureReasons)
|
||||
return failures
|
||||
}
|
||||
|
||||
// WaitForRegistration waits for a specific number of plugins to register
|
||||
func (h *TestHarness) WaitForRegistration(count int, timeout time.Duration) bool {
|
||||
deadline := time.Now().Add(timeout)
|
||||
for {
|
||||
h.mu.RLock()
|
||||
current := h.registeredPlugins
|
||||
h.mu.RUnlock()
|
||||
|
||||
if current >= count {
|
||||
return true
|
||||
}
|
||||
|
||||
if time.Now().After(deadline) {
|
||||
return false
|
||||
}
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
// VerifyAdminServiceStats checks admin service statistics
|
||||
func (h *TestHarness) VerifyAdminServiceStats(regCount, jobCount int) bool {
|
||||
return h.adminService.GetRegistrationCount() == regCount &&
|
||||
h.adminService.GetJobDispatchCount() == jobCount
|
||||
}
|
||||
|
||||
// GetCompletedJobCount returns the number of completed jobs
|
||||
func (h *TestHarness) GetCompletedJobCount() int {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
|
||||
count := 0
|
||||
for _, job := range h.jobs {
|
||||
if job.Status == plugin_pb.ExecutionStatus_EXECUTION_STATUS_COMPLETED {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
// GetFailedJobCount returns the number of failed jobs
|
||||
func (h *TestHarness) GetFailedJobCount() int {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
|
||||
count := 0
|
||||
for _, job := range h.jobs {
|
||||
if job.Status == plugin_pb.ExecutionStatus_EXECUTION_STATUS_FAILED {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
// Cleanup performs cleanup after a test
|
||||
func (h *TestHarness) Cleanup() {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
// Reset all plugins
|
||||
for _, plugin := range h.plugins {
|
||||
plugin.Reset()
|
||||
}
|
||||
|
||||
// Reset admin service
|
||||
h.adminService.ResetCounters()
|
||||
|
||||
// Clear tracking
|
||||
h.registrations = make(map[string]*RegistrationRecord)
|
||||
h.jobs = make(map[string]*JobTracker)
|
||||
h.detections = make(map[string][]*DetectionRecord)
|
||||
h.executions = make(map[string]*ExecutionRecord)
|
||||
h.failureReasons = make([]string, 0)
|
||||
h.registeredPlugins = 0
|
||||
h.startTime = time.Now()
|
||||
}
|
||||
|
||||
// MockExecuteJobStream is a mock implementation of the ExecuteJob stream
|
||||
type MockExecuteJobStream struct {
|
||||
responses []*plugin_pb.ExecuteJobResponse
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// Send sends a response on the stream
|
||||
func (m *MockExecuteJobStream) Send(resp *plugin_pb.ExecuteJobResponse) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.responses = append(m.responses, resp)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Recv receives a response from the stream
|
||||
func (m *MockExecuteJobStream) Recv() (*plugin_pb.ExecuteJobResponse, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if len(m.responses) == 0 {
|
||||
return nil, fmt.Errorf("no responses")
|
||||
}
|
||||
resp := m.responses[0]
|
||||
m.responses = m.responses[1:]
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// SetHeader sets the metadata header
|
||||
func (m *MockExecuteJobStream) SetHeader(map[string][]string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendHeader sends the metadata header
|
||||
func (m *MockExecuteJobStream) SendHeader(map[string][]string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetTrailer sets the metadata trailer
|
||||
func (m *MockExecuteJobStream) SetTrailer(map[string][]string) {
|
||||
}
|
||||
|
||||
// Context returns the context
|
||||
func (m *MockExecuteJobStream) Context() context.Context {
|
||||
return context.Background()
|
||||
}
|
||||
|
||||
// SendMsg sends a message on the stream
|
||||
func (m *MockExecuteJobStream) SendMsg(interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// RecvMsg receives a message from the stream
|
||||
func (m *MockExecuteJobStream) RecvMsg(interface{}) error {
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user