mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-17 07:11:30 +00:00
343 lines
9.2 KiB
Go
343 lines
9.2 KiB
Go
package testing
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/protobuf/types/known/durationpb"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
|
)
|
|
|
|
// MockPluginService simulates the admin-side PluginService for testing
|
|
type MockPluginService struct {
|
|
mu sync.RWMutex
|
|
plugins map[string]*MockPluginInstance
|
|
jobs map[string]*MockJob
|
|
jobCounter int
|
|
heartbeats map[string]*plugin_pb.HealthReport
|
|
lastHeartbeatTime map[string]time.Time
|
|
jobDispatchCalls int
|
|
registrationCalls int
|
|
receivedHealthReports []plugin_pb.HealthReport
|
|
}
|
|
|
|
// MockPluginInstance tracks a registered plugin
|
|
type MockPluginInstance struct {
|
|
ID string
|
|
Name string
|
|
Version string
|
|
Status string
|
|
Capabilities []string
|
|
MaxConcurrentJobs int
|
|
ConnectedAt time.Time
|
|
LastHeartbeat time.Time
|
|
ActiveJobCount int
|
|
CompletedJobCount int
|
|
FailedJobCount int
|
|
CapabilitiesDetail *plugin_pb.PluginCapabilities
|
|
Metadata map[string]string
|
|
}
|
|
|
|
// MockJob represents a job dispatched to a plugin
|
|
type MockJob struct {
|
|
ID string
|
|
Type string
|
|
PluginID string
|
|
Payload *plugin_pb.JobPayload
|
|
Timeout time.Duration
|
|
RetryCount int
|
|
Context map[string]string
|
|
Status plugin_pb.ExecutionStatus
|
|
DispatchedAt time.Time
|
|
ExecutedAt *time.Time
|
|
Result *plugin_pb.JobResult
|
|
ResultMessage string
|
|
StreamCalls int
|
|
}
|
|
|
|
// NewMockPluginService creates a new mock admin service
|
|
func NewMockPluginService() *MockPluginService {
|
|
return &MockPluginService{
|
|
plugins: make(map[string]*MockPluginInstance),
|
|
jobs: make(map[string]*MockJob),
|
|
heartbeats: make(map[string]*plugin_pb.HealthReport),
|
|
lastHeartbeatTime: make(map[string]time.Time),
|
|
receivedHealthReports: make([]plugin_pb.HealthReport, 0),
|
|
}
|
|
}
|
|
|
|
// Connect handles plugin registration
|
|
func (m *MockPluginService) Connect(ctx context.Context, req *plugin_pb.PluginConnectRequest) (*plugin_pb.PluginConnectResponse, error) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
m.registrationCalls++
|
|
|
|
// Register the plugin
|
|
plugin := &MockPluginInstance{
|
|
ID: req.PluginId,
|
|
Name: req.PluginName,
|
|
Version: req.Version,
|
|
Status: "connected",
|
|
Capabilities: req.Capabilities,
|
|
MaxConcurrentJobs: int(req.MaxConcurrentJobs),
|
|
ConnectedAt: time.Now(),
|
|
LastHeartbeat: time.Now(),
|
|
CapabilitiesDetail: req.CapabilitiesDetail,
|
|
Metadata: req.Metadata,
|
|
}
|
|
m.plugins[req.PluginId] = plugin
|
|
m.lastHeartbeatTime[req.PluginId] = time.Now()
|
|
|
|
// Build response with assigned types
|
|
assignedTypes := req.Capabilities
|
|
|
|
config := &plugin_pb.PluginConfig{
|
|
PluginId: req.PluginId,
|
|
Properties: make(map[string]string),
|
|
JobTypes: make([]*plugin_pb.JobTypeConfig, 0),
|
|
}
|
|
|
|
return &plugin_pb.PluginConnectResponse{
|
|
Success: true,
|
|
Message: "Plugin registered successfully",
|
|
MasterId: "mock-master-001",
|
|
AssignedTypes: assignedTypes,
|
|
Config: config,
|
|
}, nil
|
|
}
|
|
|
|
// SimulateJobExecution simulates job execution
|
|
func (m *MockPluginService) SimulateJobExecution(req *plugin_pb.ExecuteJobRequest) error {
|
|
m.mu.Lock()
|
|
|
|
m.jobDispatchCalls++
|
|
|
|
// Create job entry
|
|
job := &MockJob{
|
|
ID: req.JobId,
|
|
Type: req.JobType,
|
|
Payload: req.Payload,
|
|
Timeout: durationFromProto(req.Timeout),
|
|
RetryCount: int(req.RetryCount),
|
|
Context: req.Context,
|
|
DispatchedAt: time.Now(),
|
|
Status: plugin_pb.ExecutionStatus_EXECUTION_STATUS_ACCEPTED,
|
|
StreamCalls: 0,
|
|
}
|
|
m.jobs[req.JobId] = job
|
|
m.mu.Unlock()
|
|
|
|
// Simulate job execution
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
// Update job status
|
|
m.mu.Lock()
|
|
job.StreamCalls++
|
|
job.Status = plugin_pb.ExecutionStatus_EXECUTION_STATUS_RUNNING
|
|
m.mu.Unlock()
|
|
|
|
// Simulate processing
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
m.mu.Lock()
|
|
job.StreamCalls++
|
|
job.Status = plugin_pb.ExecutionStatus_EXECUTION_STATUS_COMPLETED
|
|
job.ResultMessage = "Job completed successfully"
|
|
now := time.Now()
|
|
job.ExecutedAt = &now
|
|
m.mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// ExecuteJob simulates job dispatch
|
|
func (m *MockPluginService) ExecuteJob(ctx context.Context, req *plugin_pb.ExecuteJobRequest) (*plugin_pb.ExecuteJobResponse, error) {
|
|
m.mu.Lock()
|
|
m.jobDispatchCalls++
|
|
m.mu.Unlock()
|
|
|
|
return &plugin_pb.ExecuteJobResponse{
|
|
JobId: req.JobId,
|
|
Status: plugin_pb.ExecutionStatus_EXECUTION_STATUS_ACCEPTED,
|
|
Message: "Job accepted",
|
|
}, nil
|
|
}
|
|
|
|
// ReportHealth handles plugin health reports
|
|
func (m *MockPluginService) ReportHealth(ctx context.Context, report *plugin_pb.HealthReport) (*plugin_pb.HealthReportResponse, error) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
m.heartbeats[report.PluginId] = report
|
|
m.lastHeartbeatTime[report.PluginId] = time.Now()
|
|
m.receivedHealthReports = append(m.receivedHealthReports, *report)
|
|
|
|
// Update plugin status
|
|
if plugin, ok := m.plugins[report.PluginId]; ok {
|
|
plugin.LastHeartbeat = time.Now()
|
|
plugin.ActiveJobCount = int(report.ActiveJobs)
|
|
}
|
|
|
|
return &plugin_pb.HealthReportResponse{
|
|
Acknowledged: true,
|
|
Feedback: "Health report received",
|
|
}, nil
|
|
}
|
|
|
|
// GetConfig handles config retrieval
|
|
func (m *MockPluginService) GetConfig(ctx context.Context, req *plugin_pb.GetConfigRequest) (*plugin_pb.GetConfigResponse, error) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
config := &plugin_pb.PluginConfig{
|
|
PluginId: req.PluginId,
|
|
Properties: make(map[string]string),
|
|
JobTypes: make([]*plugin_pb.JobTypeConfig, 0),
|
|
}
|
|
|
|
return &plugin_pb.GetConfigResponse{
|
|
Config: config,
|
|
Version: 1,
|
|
}, nil
|
|
}
|
|
|
|
// SubmitResult handles job result submission
|
|
func (m *MockPluginService) SubmitResult(ctx context.Context, req *plugin_pb.JobResultRequest) (*plugin_pb.JobResultResponse, error) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
if job, ok := m.jobs[req.JobId]; ok {
|
|
job.Status = req.Status
|
|
job.Result = req.Result
|
|
job.ResultMessage = req.Message
|
|
}
|
|
|
|
return &plugin_pb.JobResultResponse{
|
|
Acknowledged: true,
|
|
ActionsToTake: []string{},
|
|
}, nil
|
|
}
|
|
|
|
// GetRegistrationCount returns how many times Connect was called
|
|
func (m *MockPluginService) GetRegistrationCount() int {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
return m.registrationCalls
|
|
}
|
|
|
|
// GetJobDispatchCount returns how many times ExecuteJob was called
|
|
func (m *MockPluginService) GetJobDispatchCount() int {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
return m.jobDispatchCalls
|
|
}
|
|
|
|
// GetPluginCount returns the number of registered plugins
|
|
func (m *MockPluginService) GetPluginCount() int {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
return len(m.plugins)
|
|
}
|
|
|
|
// GetPlugin returns a registered plugin by ID
|
|
func (m *MockPluginService) GetPlugin(pluginID string) *MockPluginInstance {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
return m.plugins[pluginID]
|
|
}
|
|
|
|
// GetJob returns a dispatched job by ID
|
|
func (m *MockPluginService) GetJob(jobID string) *MockJob {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
return m.jobs[jobID]
|
|
}
|
|
|
|
// GetJobCount returns the total number of dispatched jobs
|
|
func (m *MockPluginService) GetJobCount() int {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
return len(m.jobs)
|
|
}
|
|
|
|
// GetLastHeartbeat returns the last heartbeat time for a plugin
|
|
func (m *MockPluginService) GetLastHeartbeat(pluginID string) *time.Time {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
if t, ok := m.lastHeartbeatTime[pluginID]; ok {
|
|
return &t
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetHeartbeatCount returns how many heartbeats have been received
|
|
func (m *MockPluginService) GetHeartbeatCount() int {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
return len(m.receivedHealthReports)
|
|
}
|
|
|
|
// ResetCounters resets all counters for a fresh test
|
|
func (m *MockPluginService) ResetCounters() {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.registrationCalls = 0
|
|
m.jobDispatchCalls = 0
|
|
m.plugins = make(map[string]*MockPluginInstance)
|
|
m.jobs = make(map[string]*MockJob)
|
|
m.heartbeats = make(map[string]*plugin_pb.HealthReport)
|
|
m.lastHeartbeatTime = make(map[string]time.Time)
|
|
m.receivedHealthReports = make([]plugin_pb.HealthReport, 0)
|
|
}
|
|
|
|
// VerifyJobCompleted checks if a job was completed successfully
|
|
func (m *MockPluginService) VerifyJobCompleted(jobID string) bool {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
job, ok := m.jobs[jobID]
|
|
if !ok {
|
|
return false
|
|
}
|
|
return job.Status == plugin_pb.ExecutionStatus_EXECUTION_STATUS_COMPLETED
|
|
}
|
|
|
|
// VerifyJobFailed checks if a job failed
|
|
func (m *MockPluginService) VerifyJobFailed(jobID string) bool {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
job, ok := m.jobs[jobID]
|
|
if !ok {
|
|
return false
|
|
}
|
|
return job.Status == plugin_pb.ExecutionStatus_EXECUTION_STATUS_FAILED
|
|
}
|
|
|
|
// GetJobStatus returns the current status of a job
|
|
func (m *MockPluginService) GetJobStatus(jobID string) plugin_pb.ExecutionStatus {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
if job, ok := m.jobs[jobID]; ok {
|
|
return job.Status
|
|
}
|
|
return plugin_pb.ExecutionStatus_EXECUTION_STATUS_UNKNOWN
|
|
}
|
|
|
|
// VerifyPluginRegistered checks if a plugin is registered
|
|
func (m *MockPluginService) VerifyPluginRegistered(pluginID string) bool {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
_, ok := m.plugins[pluginID]
|
|
return ok
|
|
}
|
|
|
|
// durationFromProto converts proto Duration to time.Duration
|
|
func durationFromProto(d *durationpb.Duration) time.Duration {
|
|
if d == nil {
|
|
return 0
|
|
}
|
|
return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)
|
|
}
|