Files
seaweedfs/weed/admin/plugin/testing/mock_admin.go

343 lines
9.2 KiB
Go

package testing
import (
"context"
"sync"
"time"
"google.golang.org/protobuf/types/known/durationpb"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
)
// MockPluginService simulates the admin-side PluginService for testing
type MockPluginService struct {
mu sync.RWMutex
plugins map[string]*MockPluginInstance
jobs map[string]*MockJob
jobCounter int
heartbeats map[string]*plugin_pb.HealthReport
lastHeartbeatTime map[string]time.Time
jobDispatchCalls int
registrationCalls int
receivedHealthReports []plugin_pb.HealthReport
}
// MockPluginInstance tracks a registered plugin
type MockPluginInstance struct {
ID string
Name string
Version string
Status string
Capabilities []string
MaxConcurrentJobs int
ConnectedAt time.Time
LastHeartbeat time.Time
ActiveJobCount int
CompletedJobCount int
FailedJobCount int
CapabilitiesDetail *plugin_pb.PluginCapabilities
Metadata map[string]string
}
// MockJob represents a job dispatched to a plugin
type MockJob struct {
ID string
Type string
PluginID string
Payload *plugin_pb.JobPayload
Timeout time.Duration
RetryCount int
Context map[string]string
Status plugin_pb.ExecutionStatus
DispatchedAt time.Time
ExecutedAt *time.Time
Result *plugin_pb.JobResult
ResultMessage string
StreamCalls int
}
// NewMockPluginService creates a new mock admin service
func NewMockPluginService() *MockPluginService {
return &MockPluginService{
plugins: make(map[string]*MockPluginInstance),
jobs: make(map[string]*MockJob),
heartbeats: make(map[string]*plugin_pb.HealthReport),
lastHeartbeatTime: make(map[string]time.Time),
receivedHealthReports: make([]plugin_pb.HealthReport, 0),
}
}
// Connect handles plugin registration
func (m *MockPluginService) Connect(ctx context.Context, req *plugin_pb.PluginConnectRequest) (*plugin_pb.PluginConnectResponse, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.registrationCalls++
// Register the plugin
plugin := &MockPluginInstance{
ID: req.PluginId,
Name: req.PluginName,
Version: req.Version,
Status: "connected",
Capabilities: req.Capabilities,
MaxConcurrentJobs: int(req.MaxConcurrentJobs),
ConnectedAt: time.Now(),
LastHeartbeat: time.Now(),
CapabilitiesDetail: req.CapabilitiesDetail,
Metadata: req.Metadata,
}
m.plugins[req.PluginId] = plugin
m.lastHeartbeatTime[req.PluginId] = time.Now()
// Build response with assigned types
assignedTypes := req.Capabilities
config := &plugin_pb.PluginConfig{
PluginId: req.PluginId,
Properties: make(map[string]string),
JobTypes: make([]*plugin_pb.JobTypeConfig, 0),
}
return &plugin_pb.PluginConnectResponse{
Success: true,
Message: "Plugin registered successfully",
MasterId: "mock-master-001",
AssignedTypes: assignedTypes,
Config: config,
}, nil
}
// SimulateJobExecution simulates job execution
func (m *MockPluginService) SimulateJobExecution(req *plugin_pb.ExecuteJobRequest) error {
m.mu.Lock()
m.jobDispatchCalls++
// Create job entry
job := &MockJob{
ID: req.JobId,
Type: req.JobType,
Payload: req.Payload,
Timeout: durationFromProto(req.Timeout),
RetryCount: int(req.RetryCount),
Context: req.Context,
DispatchedAt: time.Now(),
Status: plugin_pb.ExecutionStatus_EXECUTION_STATUS_ACCEPTED,
StreamCalls: 0,
}
m.jobs[req.JobId] = job
m.mu.Unlock()
// Simulate job execution
time.Sleep(50 * time.Millisecond)
// Update job status
m.mu.Lock()
job.StreamCalls++
job.Status = plugin_pb.ExecutionStatus_EXECUTION_STATUS_RUNNING
m.mu.Unlock()
// Simulate processing
time.Sleep(50 * time.Millisecond)
m.mu.Lock()
job.StreamCalls++
job.Status = plugin_pb.ExecutionStatus_EXECUTION_STATUS_COMPLETED
job.ResultMessage = "Job completed successfully"
now := time.Now()
job.ExecutedAt = &now
m.mu.Unlock()
return nil
}
// ExecuteJob simulates job dispatch
func (m *MockPluginService) ExecuteJob(ctx context.Context, req *plugin_pb.ExecuteJobRequest) (*plugin_pb.ExecuteJobResponse, error) {
m.mu.Lock()
m.jobDispatchCalls++
m.mu.Unlock()
return &plugin_pb.ExecuteJobResponse{
JobId: req.JobId,
Status: plugin_pb.ExecutionStatus_EXECUTION_STATUS_ACCEPTED,
Message: "Job accepted",
}, nil
}
// ReportHealth handles plugin health reports
func (m *MockPluginService) ReportHealth(ctx context.Context, report *plugin_pb.HealthReport) (*plugin_pb.HealthReportResponse, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.heartbeats[report.PluginId] = report
m.lastHeartbeatTime[report.PluginId] = time.Now()
m.receivedHealthReports = append(m.receivedHealthReports, *report)
// Update plugin status
if plugin, ok := m.plugins[report.PluginId]; ok {
plugin.LastHeartbeat = time.Now()
plugin.ActiveJobCount = int(report.ActiveJobs)
}
return &plugin_pb.HealthReportResponse{
Acknowledged: true,
Feedback: "Health report received",
}, nil
}
// GetConfig handles config retrieval
func (m *MockPluginService) GetConfig(ctx context.Context, req *plugin_pb.GetConfigRequest) (*plugin_pb.GetConfigResponse, error) {
m.mu.RLock()
defer m.mu.RUnlock()
config := &plugin_pb.PluginConfig{
PluginId: req.PluginId,
Properties: make(map[string]string),
JobTypes: make([]*plugin_pb.JobTypeConfig, 0),
}
return &plugin_pb.GetConfigResponse{
Config: config,
Version: 1,
}, nil
}
// SubmitResult handles job result submission
func (m *MockPluginService) SubmitResult(ctx context.Context, req *plugin_pb.JobResultRequest) (*plugin_pb.JobResultResponse, error) {
m.mu.Lock()
defer m.mu.Unlock()
if job, ok := m.jobs[req.JobId]; ok {
job.Status = req.Status
job.Result = req.Result
job.ResultMessage = req.Message
}
return &plugin_pb.JobResultResponse{
Acknowledged: true,
ActionsToTake: []string{},
}, nil
}
// GetRegistrationCount returns how many times Connect was called
func (m *MockPluginService) GetRegistrationCount() int {
m.mu.RLock()
defer m.mu.RUnlock()
return m.registrationCalls
}
// GetJobDispatchCount returns how many times ExecuteJob was called
func (m *MockPluginService) GetJobDispatchCount() int {
m.mu.RLock()
defer m.mu.RUnlock()
return m.jobDispatchCalls
}
// GetPluginCount returns the number of registered plugins
func (m *MockPluginService) GetPluginCount() int {
m.mu.RLock()
defer m.mu.RUnlock()
return len(m.plugins)
}
// GetPlugin returns a registered plugin by ID
func (m *MockPluginService) GetPlugin(pluginID string) *MockPluginInstance {
m.mu.RLock()
defer m.mu.RUnlock()
return m.plugins[pluginID]
}
// GetJob returns a dispatched job by ID
func (m *MockPluginService) GetJob(jobID string) *MockJob {
m.mu.RLock()
defer m.mu.RUnlock()
return m.jobs[jobID]
}
// GetJobCount returns the total number of dispatched jobs
func (m *MockPluginService) GetJobCount() int {
m.mu.RLock()
defer m.mu.RUnlock()
return len(m.jobs)
}
// GetLastHeartbeat returns the last heartbeat time for a plugin
func (m *MockPluginService) GetLastHeartbeat(pluginID string) *time.Time {
m.mu.RLock()
defer m.mu.RUnlock()
if t, ok := m.lastHeartbeatTime[pluginID]; ok {
return &t
}
return nil
}
// GetHeartbeatCount returns how many heartbeats have been received
func (m *MockPluginService) GetHeartbeatCount() int {
m.mu.RLock()
defer m.mu.RUnlock()
return len(m.receivedHealthReports)
}
// ResetCounters resets all counters for a fresh test
func (m *MockPluginService) ResetCounters() {
m.mu.Lock()
defer m.mu.Unlock()
m.registrationCalls = 0
m.jobDispatchCalls = 0
m.plugins = make(map[string]*MockPluginInstance)
m.jobs = make(map[string]*MockJob)
m.heartbeats = make(map[string]*plugin_pb.HealthReport)
m.lastHeartbeatTime = make(map[string]time.Time)
m.receivedHealthReports = make([]plugin_pb.HealthReport, 0)
}
// VerifyJobCompleted checks if a job was completed successfully
func (m *MockPluginService) VerifyJobCompleted(jobID string) bool {
m.mu.RLock()
defer m.mu.RUnlock()
job, ok := m.jobs[jobID]
if !ok {
return false
}
return job.Status == plugin_pb.ExecutionStatus_EXECUTION_STATUS_COMPLETED
}
// VerifyJobFailed checks if a job failed
func (m *MockPluginService) VerifyJobFailed(jobID string) bool {
m.mu.RLock()
defer m.mu.RUnlock()
job, ok := m.jobs[jobID]
if !ok {
return false
}
return job.Status == plugin_pb.ExecutionStatus_EXECUTION_STATUS_FAILED
}
// GetJobStatus returns the current status of a job
func (m *MockPluginService) GetJobStatus(jobID string) plugin_pb.ExecutionStatus {
m.mu.RLock()
defer m.mu.RUnlock()
if job, ok := m.jobs[jobID]; ok {
return job.Status
}
return plugin_pb.ExecutionStatus_EXECUTION_STATUS_UNKNOWN
}
// VerifyPluginRegistered checks if a plugin is registered
func (m *MockPluginService) VerifyPluginRegistered(pluginID string) bool {
m.mu.RLock()
defer m.mu.RUnlock()
_, ok := m.plugins[pluginID]
return ok
}
// durationFromProto converts proto Duration to time.Duration
func durationFromProto(d *durationpb.Duration) time.Duration {
if d == nil {
return 0
}
return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)
}