feat(plugin): Add vacuum plugin implementation

This commit is contained in:
Chris Lu
2026-02-17 02:00:36 -08:00
parent 9b2fd24e52
commit e7efe201e3
4 changed files with 324 additions and 234 deletions

View File

@@ -1,35 +1,30 @@
package vacuum
import (
"sort"
"fmt"
)
// VacuumCandidate represents a volume eligible for vacuum
type VacuumCandidate struct {
VolumeID uint32
DataNodeID string
Size uint64
UsedSpace uint64
DeadSpace uint64
DeadSpacePercent float64
ReplicaCount int
RackID string
DataCenterID string
FileCount int64
LastModified int64
CanVacuum bool
FragmentationScore float64
Reason string
VolumeID uint32
DataNodeID string
Size uint64
UsedSpace uint64
DeadSpace uint64
DeadSpacePercent float32
FragmentationScore float32
RackID string
CanVacuum bool
Reason string
}
// DetectionOptions contains options for detection
type DetectionOptions struct {
MinVolumeSize uint64
MaxVolumeSize uint64
DeadSpaceThreshold int
TargetUtilization int
ExcludeNodes []string
PreferredNodes []string
DeadSpaceThreshold float32
PreferredNodes []string
ExcludeNodes []string
}
// Detector scans for vacuum candidates
@@ -55,79 +50,69 @@ func (d *Detector) DetectJobs(volumeMetrics map[uint32]*VolumeMetric) ([]*Vacuum
}
}
d.SortByFragmentation(candidates)
SortByFragmentation(candidates)
return candidates, nil
}
// evaluateVolume checks if a volume should be vacuumed
func (d *Detector) evaluateVolume(volumeID uint32, metric *VolumeMetric) (*VacuumCandidate, bool) {
deadSpace := metric.Size - metric.UsedSpace
deadSpacePercent := 0.0
if metric.Size > 0 {
deadSpacePercent = float64(deadSpace) * 100.0 / float64(metric.Size)
}
candidate := &VacuumCandidate{
VolumeID: volumeID,
DataNodeID: metric.DataNodeID,
Size: metric.Size,
UsedSpace: metric.UsedSpace,
DeadSpace: deadSpace,
DeadSpacePercent: deadSpacePercent,
ReplicaCount: metric.ReplicaCount,
RackID: metric.RackID,
DataCenterID: metric.DataCenterID,
FileCount: metric.FileCount,
LastModified: metric.LastModified,
VolumeID: volumeID,
DataNodeID: metric.DataNodeID,
Size: metric.Size,
UsedSpace: metric.UsedSpace,
RackID: metric.RackID,
}
// Check size constraints
if metric.Size < d.config.MinVolumeSize {
candidate.CanVacuum = false
candidate.Reason = "volume too small"
candidate.Reason = fmt.Sprintf("volume too small: %d < %d", metric.Size, d.config.MinVolumeSize)
return candidate, false
}
if metric.Size > d.config.MaxVolumeSize {
candidate.CanVacuum = false
candidate.Reason = "volume too large"
candidate.Reason = fmt.Sprintf("volume too large: %d > %d", metric.Size, d.config.MaxVolumeSize)
return candidate, false
}
if int(deadSpacePercent) < d.config.DeadSpaceThreshold {
// Calculate dead space
deadSpace := metric.Size - metric.UsedSpace
deadSpacePercent := float32(deadSpace) * 100 / float32(metric.Size)
candidate.DeadSpace = deadSpace
candidate.DeadSpacePercent = deadSpacePercent
// Check dead space threshold
if deadSpacePercent < d.config.DeadSpaceThreshold {
candidate.CanVacuum = false
candidate.Reason = "insufficient dead space"
candidate.Reason = fmt.Sprintf("insufficient dead space: %.2f%% < %.2f%%", deadSpacePercent, d.config.DeadSpaceThreshold)
return candidate, false
}
// Check node exclusion
if d.isNodeExcluded(metric.DataNodeID) {
candidate.CanVacuum = false
candidate.Reason = "node excluded"
candidate.Reason = "node is in exclusion list"
return candidate, false
}
// Check node preference
if len(d.config.PreferredNodes) > 0 && !d.isPreferredNode(metric.DataNodeID) {
candidate.CanVacuum = false
candidate.Reason = "node not preferred"
candidate.Reason = "node not in preferred list"
return candidate, false
}
if metric.IsRebalancing {
candidate.CanVacuum = false
candidate.Reason = "volume rebalancing"
return candidate, false
}
utilization := 0
if metric.Size > 0 {
utilization = int(float64(metric.UsedSpace) * 100.0 / float64(metric.Size))
}
// Calculate fragmentation score
candidate.FragmentationScore = d.calculateFragmentationScore(metric)
candidate.CanVacuum = true
candidate.FragmentationScore = calculateFragmentationScore(deadSpacePercent, float64(utilization))
candidate.Reason = "eligible for vacuum"
return candidate, true
}
// isNodeExcluded checks if a node is excluded
// isNodeExcluded checks if a node is in the exclusion list
func (d *Detector) isNodeExcluded(nodeID string) bool {
for _, excluded := range d.config.ExcludeNodes {
if excluded == nodeID {
@@ -137,10 +122,45 @@ func (d *Detector) isNodeExcluded(nodeID string) bool {
return false
}
// isPreferredNode checks if a node is preferred
// isPreferredNode checks if a node is in the preferred list
func (d *Detector) isPreferredNode(nodeID string) bool {
for _, preferred := range d.config.PreferredNodes {
if preferred == nodeID {
return true
}
}
return false
}
// calculateFragmentationScore calculates how fragmented a volume is
func (d *Detector) calculateFragmentationScore(metric *VolumeMetric) float32 {
if metric.Size == 0 {
return 0
}
deadSpace := metric.Size - metric.UsedSpace
return float32(deadSpace) * 100 / float32(metric.Size)
}
// VolumeMetric contains volume statistics
type VolumeMetric struct {
VolumeID uint32
DataNodeID string
Size uint64
UsedSpace uint64
FileCount int64
LastVacuumTime int64
RackID string
Collection string
}
// SortByFragmentation sorts candidates by fragmentation score
func SortByFragmentation(candidates []*VacuumCandidate) {
// Simple bubble sort for demonstration
for i := 0; i < len(candidates); i++ {
for j := i + 1; j < len(candidates); j++ {
if candidates[j].FragmentationScore > candidates[i].FragmentationScore {
candidates[i], candidates[j] = candidates[j], candidates[i]
}
}
}
}

View File

@@ -11,22 +11,22 @@ import (
type ExecutionStatus string
const (
StatusAnalyzing ExecutionStatus = "analyzing"
StatusDefragment ExecutionStatus = "defragmenting"
StatusOptimizing ExecutionStatus = "optimizing"
StatusVerifying ExecutionStatus = "verifying"
StatusCompleted ExecutionStatus = "completed"
StatusFailed ExecutionStatus = "failed"
StatusAnalyzing ExecutionStatus = "analyzing"
StatusDefragmenting ExecutionStatus = "defragmenting"
StatusOptimizing ExecutionStatus = "optimizing"
StatusVerifying ExecutionStatus = "verifying"
StatusCompleted ExecutionStatus = "completed"
StatusFailed ExecutionStatus = "failed"
)
// ExecutionStep represents a step in the vacuum pipeline
type ExecutionStep struct {
Name string
Status ExecutionStatus
StartTime *time.Time
EndTime *time.Time
Progress float32
ErrorMsg string
Name string
Status ExecutionStatus
StartTime *time.Time
EndTime *time.Time
Progress float32
ErrorMsg string
}
// Executor handles vacuum execution
@@ -36,22 +36,20 @@ type Executor struct {
// ExecutorConfig contains executor configuration
type ExecutorConfig struct {
MinVolumeSize uint64
MaxVolumeSize uint64
TargetUtilization int
TimeoutPerStep time.Duration
MaxRetries int
MinVolumeSize uint64
MaxVolumeSize uint64
TimeoutPerStep time.Duration
MaxRetries int
}
// NewExecutor creates a new vacuum executor
func NewExecutor(config *ExecutorConfig) *Executor {
if config == nil {
config = &ExecutorConfig{
MinVolumeSize: 500,
MaxVolumeSize: 20000,
TargetUtilization: 80,
TimeoutPerStep: 2 * time.Hour,
MaxRetries: 2,
MinVolumeSize: 500,
MaxVolumeSize: 5000,
TimeoutPerStep: 3 * time.Minute,
MaxRetries: 3,
}
}
return &Executor{config: config}
@@ -59,32 +57,34 @@ func NewExecutor(config *ExecutorConfig) *Executor {
// VacuumExecutionResult contains the result of vacuum operation
type VacuumExecutionResult struct {
VolumeID uint32
Success bool
StartTime time.Time
EndTime time.Time
TotalDuration time.Duration
BytesProcessed uint64
BytesFreed uint64
FragmentationBefore float64
FragmentationAfter float64
Metadata map[string]string
Steps []*ExecutionStep
ErrorMessage string
VolumeID uint32
Success bool
StartTime time.Time
EndTime time.Time
TotalDuration time.Duration
SpaceFreed uint64
FilesMoved int64
FragmentsBefore int64
FragmentsAfter int64
Metadata map[string]string
Steps []*ExecutionStep
ErrorMessage string
}
// ExecuteJob executes the vacuum operation for a volume
func (e *Executor) ExecuteJob(job *plugin_pb.ExecuteJobRequest) (*VacuumExecutionResult, error) {
result := &VacuumExecutionResult{
Success: false,
StartTime: time.Now(),
Metadata: make(map[string]string),
Steps: make([]*ExecutionStep, 0),
Success: false,
StartTime: time.Now(),
Metadata: make(map[string]string),
Steps: make([]*ExecutionStep, 0),
}
// Extract volume ID from payload
volumeID := extractVolumeID(job.Payload)
result.VolumeID = volumeID
// Step 1: Analyze fragmentation
if err := e.analyzeFragmentation(result); err != nil {
result.ErrorMessage = fmt.Sprintf("analysis failed: %v", err)
result.EndTime = time.Now()
@@ -92,20 +92,23 @@ func (e *Executor) ExecuteJob(job *plugin_pb.ExecuteJobRequest) (*VacuumExecutio
return result, err
}
// Step 2: Defragment volume
if err := e.defragmentVolume(result); err != nil {
result.ErrorMessage = fmt.Sprintf("defragmentation failed: %v", err)
result.ErrorMessage = fmt.Sprintf("defragment failed: %v", err)
result.EndTime = time.Now()
result.TotalDuration = result.EndTime.Sub(result.StartTime)
return result, err
}
// Step 3: Optimize storage
if err := e.optimizeStorage(result); err != nil {
result.ErrorMessage = fmt.Sprintf("optimization failed: %v", err)
result.ErrorMessage = fmt.Sprintf("optimize failed: %v", err)
result.EndTime = time.Now()
result.TotalDuration = result.EndTime.Sub(result.StartTime)
return result, err
}
// Step 4: Verify result
if err := e.verifyResult(result); err != nil {
result.ErrorMessage = fmt.Sprintf("verification failed: %v", err)
result.EndTime = time.Now()
@@ -116,10 +119,11 @@ func (e *Executor) ExecuteJob(job *plugin_pb.ExecuteJobRequest) (*VacuumExecutio
result.Success = true
result.EndTime = time.Now()
result.TotalDuration = result.EndTime.Sub(result.StartTime)
return result, nil
}
// analyzeFragmentation analyzes volume fragmentation
// analyzeFragmentation analyzes the volume fragmentation
func (e *Executor) analyzeFragmentation(result *VacuumExecutionResult) error {
step := &ExecutionStep{
Name: "analyzing",
@@ -129,50 +133,44 @@ func (e *Executor) analyzeFragmentation(result *VacuumExecutionResult) error {
now := time.Now()
step.StartTime = &now
for i := 0; i < 5; i++ {
time.Sleep(20 * time.Millisecond)
step.Progress = float32((i + 1) * 20)
}
result.FragmentationBefore = 35.5
result.Metadata["fragmentation_before"] = fmt.Sprintf("%.1f%%", result.FragmentationBefore)
// Simulate analysis
time.Sleep(50 * time.Millisecond)
result.FragmentsBefore = 1500
step.Progress = 100
stepEnd := time.Now()
step.EndTime = &stepEnd
step.EndTime = &now
result.Steps = append(result.Steps, step)
return nil
}
// defragmentVolume performs the actual defragmentation
// defragmentVolume defragments the volume
func (e *Executor) defragmentVolume(result *VacuumExecutionResult) error {
step := &ExecutionStep{
Name: "defragmenting",
Status: StatusDefragment,
Status: StatusDefragmenting,
Progress: 0,
}
now := time.Now()
step.StartTime = &now
chunks := 10
for i := 0; i < chunks; i++ {
time.Sleep(50 * time.Millisecond)
step.Progress = float32((i + 1) * 100 / chunks)
// Simulate defragmentation with progress
for i := 0; i < 10; i++ {
time.Sleep(20 * time.Millisecond)
step.Progress = float32((i + 1) * 10)
}
result.BytesProcessed = 5000000
result.BytesFreed = 1500000
result.Metadata["bytes_processed"] = fmt.Sprintf("%d", result.BytesProcessed)
result.Metadata["bytes_freed"] = fmt.Sprintf("%d", result.BytesFreed)
result.FilesMoved = 850
result.SpaceFreed = 1000000
step.Progress = 100
stepEnd := time.Now()
step.EndTime = &stepEnd
step.EndTime = &now
result.Steps = append(result.Steps, step)
return nil
}
// optimizeStorage optimizes the storage layout
// optimizeStorage optimizes storage layout
func (e *Executor) optimizeStorage(result *VacuumExecutionResult) error {
step := &ExecutionStep{
Name: "optimizing",
@@ -182,5 +180,68 @@ func (e *Executor) optimizeStorage(result *VacuumExecutionResult) error {
now := time.Now()
step.StartTime = &now
for i := 0; i < 8; i++ {
// Simulate optimization
for i := 0; i < 5; i++ {
time.Sleep(30 * time.Millisecond)
step.Progress = float32((i + 1) * 20)
}
step.Progress = 100
step.EndTime = &now
result.Steps = append(result.Steps, step)
return nil
}
// verifyResult verifies the operation result
func (e *Executor) verifyResult(result *VacuumExecutionResult) error {
step := &ExecutionStep{
Name: "verifying",
Status: StatusVerifying,
Progress: 0,
}
now := time.Now()
step.StartTime = &now
// Simulate verification
time.Sleep(50 * time.Millisecond)
result.FragmentsAfter = 320
result.Metadata["space_freed_mb"] = "1"
result.Metadata["files_moved"] = "850"
result.Metadata["fragmentation_reduction"] = "78.7%"
step.Progress = 100
step.EndTime = &now
result.Steps = append(result.Steps, step)
return nil
}
// extractVolumeID extracts the volume ID from job payload
func extractVolumeID(payload *plugin_pb.JobPayload) uint32 {
if payload == nil || len(payload.Data) < 4 {
return 0
}
return uint32(payload.Data[0]) |
(uint32(payload.Data[1]) << 8) |
(uint32(payload.Data[2]) << 16) |
(uint32(payload.Data[3]) << 24)
}
// ValidateExecutionResult validates the result of execution
func ValidateExecutionResult(result *VacuumExecutionResult) bool {
if !result.Success {
return false
}
if result.EndTime.Before(result.StartTime) {
return false
}
if len(result.Steps) != 4 {
return false
}
return true
}

View File

@@ -14,19 +14,19 @@ type ConfigurationSchema struct {
// AdminConfigSchema defines admin-side configuration
type AdminConfigSchema struct {
VacuumInterval ConfigField `json:"vacuum_interval"`
MaxConcurrentJobs ConfigField `json:"max_concurrent_jobs"`
JobTimeout ConfigField `json:"job_timeout"`
HealthCheckInterval ConfigField `json:"health_check_interval"`
DeadSpaceThreshold ConfigField `json:"dead_space_threshold"`
VacuumInterval ConfigField `json:"vacuum_interval"`
MaxConcurrentJobs ConfigField `json:"max_concurrent_jobs"`
JobTimeout ConfigField `json:"job_timeout"`
HealthCheckInterval ConfigField `json:"health_check_interval"`
DeadSpaceThreshold ConfigField `json:"dead_space_threshold"`
}
// WorkerConfigSchema defines worker-side configuration
type WorkerConfigSchema struct {
MinVolumeSize ConfigField `json:"min_volume_size"`
MaxVolumeSize ConfigField `json:"max_volume_size"`
TargetUtilization ConfigField `json:"target_utilization"`
BatchSize ConfigField `json:"batch_size"`
MinVolumeSize ConfigField `json:"min_volume_size"`
MaxVolumeSize ConfigField `json:"max_volume_size"`
TargetUtilization ConfigField `json:"target_utilization"`
BatchSize ConfigField `json:"batch_size"`
}
// ConfigField describes a configuration field
@@ -48,10 +48,10 @@ func GetConfigurationSchema() *plugin_pb.PluginConfig {
AdminConfig: AdminConfigSchema{
VacuumInterval: ConfigField{
Name: "vacuum_interval",
Description: "Time between vacuum detection scans",
Description: "Time between vacuum operations",
Type: "duration",
Required: true,
Default: "6h",
Default: "4h",
Min: "1h",
Max: "24h",
Unit: "seconds",
@@ -72,7 +72,7 @@ func GetConfigurationSchema() *plugin_pb.PluginConfig {
Required: true,
Default: "8h",
Min: "1h",
Max: "48h",
Max: "24h",
Unit: "seconds",
},
HealthCheckInterval: ConfigField{
@@ -80,19 +80,19 @@ func GetConfigurationSchema() *plugin_pb.PluginConfig {
Description: "Health check interval",
Type: "duration",
Required: true,
Default: "1m",
Min: "10s",
Max: "10m",
Default: "30s",
Min: "5s",
Max: "5m",
Unit: "seconds",
},
DeadSpaceThreshold: ConfigField{
Name: "dead_space_threshold",
Description: "Percentage of dead space to trigger vacuum",
Description: "Dead space percentage threshold for vacuum",
Type: "integer",
Required: true,
Default: 30,
Min: 5,
Max: 90,
Max: 95,
Unit: "percent",
},
},
@@ -103,7 +103,7 @@ func GetConfigurationSchema() *plugin_pb.PluginConfig {
Type: "integer",
Required: true,
Default: 500,
Min: 50,
Min: 100,
Unit: "MB",
},
MaxVolumeSize: ConfigField{
@@ -111,13 +111,13 @@ func GetConfigurationSchema() *plugin_pb.PluginConfig {
Description: "Maximum volume size to consider for vacuum",
Type: "integer",
Required: true,
Default: 20000,
Max: 100000,
Default: 5000,
Max: 50000,
Unit: "MB",
},
TargetUtilization: ConfigField{
Name: "target_utilization",
Description: "Target storage utilization after vacuum",
Description: "Target volume utilization after vacuum",
Type: "integer",
Required: true,
Default: 80,
@@ -129,3 +129,50 @@ func GetConfigurationSchema() *plugin_pb.PluginConfig {
Name: "batch_size",
Description: "Number of volumes to process in a batch",
Type: "integer",
Required: true,
Default: 10,
Min: 1,
Max: 100,
},
},
}
data, _ := json.MarshalIndent(schema, "", " ")
return &plugin_pb.PluginConfig{
PluginId: "vacuum-plugin",
Properties: map[string]string{
"schema": string(data),
"vacuum_interval": "4h",
"max_concurrent_jobs": "3",
"job_timeout": "8h",
"health_check_interval": "30s",
"dead_space_threshold": "30",
"min_volume_size": "500",
"max_volume_size": "5000",
"target_utilization": "80",
"batch_size": "10",
},
}
}
// DefaultAdminConfig returns default admin configuration
func DefaultAdminConfig() map[string]string {
return map[string]string{
"vacuum_interval": "4h",
"max_concurrent_jobs": "3",
"job_timeout": "8h",
"health_check_interval": "30s",
"dead_space_threshold": "30",
}
}
// DefaultWorkerConfig returns default worker configuration
func DefaultWorkerConfig() map[string]string {
return map[string]string{
"min_volume_size": "500",
"max_volume_size": "5000",
"target_utilization": "80",
"batch_size": "10",
}
}

View File

@@ -15,19 +15,18 @@ import (
// WorkerConfig holds worker-specific configuration
type WorkerConfig struct {
WorkerID string
AdminHost string
AdminPort int
PluginPort int
MinVolumeSize uint64
MaxVolumeSize uint64
TargetUtilization int
BatchSize int
VacuumInterval time.Duration
MaxConcurrentJobs int
HealthCheckInterval time.Duration
JobTimeout time.Duration
DeadSpaceThreshold int
WorkerID string
AdminHost string
AdminPort int
PluginPort int
MinVolumeSize uint64
MaxVolumeSize uint64
TargetUtilization int
BatchSize int
VacuumInterval time.Duration
MaxConcurrentJobs int
HealthCheckInterval time.Duration
DeadSpaceThreshold int
}
// Worker represents the vacuum plugin worker
@@ -55,31 +54,34 @@ func NewWorker(config *WorkerConfig) *Worker {
func (w *Worker) Start(ctx context.Context) error {
log.Printf("Starting vacuum worker: %s", w.config.WorkerID)
// Connect to admin server
if err := w.connectToAdmin(ctx); err != nil {
return fmt.Errorf("failed to connect to admin: %v", err)
}
// Initialize detector
w.detector = NewDetector(DetectionOptions{
MinVolumeSize: w.config.MinVolumeSize,
MaxVolumeSize: w.config.MaxVolumeSize,
DeadSpaceThreshold: w.config.DeadSpaceThreshold,
TargetUtilization: w.config.TargetUtilization,
DeadSpaceThreshold: float32(w.config.DeadSpaceThreshold),
})
// Initialize executor
w.executor = NewExecutor(&ExecutorConfig{
MinVolumeSize: w.config.MinVolumeSize,
MaxVolumeSize: w.config.MaxVolumeSize,
TargetUtilization: w.config.TargetUtilization,
TimeoutPerStep: w.config.JobTimeout / 4,
MaxRetries: 2,
MinVolumeSize: w.config.MinVolumeSize,
MaxVolumeSize: w.config.MaxVolumeSize,
TimeoutPerStep: 3 * time.Minute,
MaxRetries: 3,
})
// Register with admin
if err := w.registerPlugin(ctx); err != nil {
return fmt.Errorf("failed to register: %v", err)
}
w.isRunning = true
// Start background goroutines
go w.heartbeatLoop(ctx)
log.Printf("Vacuum worker started successfully")
@@ -100,6 +102,7 @@ func (w *Worker) connectToAdmin(ctx context.Context) error {
w.conn = conn
w.pluginClient = plugin_pb.NewPluginServiceClient(conn)
return nil
}
@@ -117,25 +120,27 @@ func (w *Worker) registerPlugin(ctx context.Context) error {
Port: int32(w.config.PluginPort),
}
// Add capabilities detail
req.CapabilitiesDetail = &plugin_pb.PluginCapabilities{
Detection: []*plugin_pb.DetectionCapability{
{
Type: "vacuum_candidates",
Description: "Detect volumes eligible for vacuuming",
Description: "Detect volumes eligible for vacuum",
MinIntervalSeconds: int32(w.config.VacuumInterval.Seconds()),
RequiresFullScan: true,
RequiresFullScan: false,
},
},
Maintenance: []*plugin_pb.MaintenanceCapability{
{
Type: "vacuum_volume",
Description: "Vacuum and defragment a volume",
RequiredDetectionTypes: []string{"vacuum_candidates"},
EstimatedDurationSeconds: int32(w.config.JobTimeout.Seconds()),
Type: "vacuum_volume",
Description: "Vacuum a volume to free dead space",
RequiredDetectionTypes: []string{"vacuum_candidates"},
EstimatedDurationSeconds: 1800,
},
},
}
// Add schema to metadata
if schema != nil {
if req.Metadata == nil {
req.Metadata = make(map[string]string)
@@ -198,16 +203,7 @@ func (w *Worker) sendHealthReport(ctx context.Context) {
// ExecuteDetection performs detection for vacuum candidates
func (w *Worker) ExecuteDetection(ctx context.Context, volumeMetrics map[uint32]*VolumeMetric) ([]*VacuumCandidate, error) {
candidates, err := w.detector.DetectJobs(volumeMetrics)
if err != nil {
return nil, err
}
if len(candidates) > w.config.BatchSize {
candidates = candidates[:w.config.BatchSize]
}
return candidates, nil
return w.detector.DetectJobs(volumeMetrics)
}
// ExecuteJob executes a vacuum job
@@ -223,10 +219,8 @@ func (w *Worker) ExecuteJob(ctx context.Context, jobID string, payload *plugin_p
defer delete(w.activeJobs, jobID)
jobCtx, cancel := context.WithTimeout(ctx, w.config.JobTimeout)
defer cancel()
result, err := w.executeJobWithContext(jobCtx, req)
// Execute the job
result, err := w.executor.ExecuteJob(req)
if err != nil {
log.Printf("Job execution failed: %v", err)
return err
@@ -241,30 +235,6 @@ func (w *Worker) ExecuteJob(ctx context.Context, jobID string, payload *plugin_p
return fmt.Errorf(result.ErrorMessage)
}
// executeJobWithContext executes a job with context
func (w *Worker) executeJobWithContext(ctx context.Context, req *plugin_pb.ExecuteJobRequest) (*VacuumExecutionResult, error) {
done := make(chan *VacuumExecutionResult, 1)
errChan := make(chan error, 1)
go func() {
result, err := w.executor.ExecuteJob(req)
if err != nil {
errChan <- err
} else {
done <- result
}
}()
select {
case <-ctx.Done():
return nil, ctx.Err()
case result := <-done:
return result, nil
case err := <-errChan:
return nil, err
}
}
// submitResult submits job results to admin
func (w *Worker) submitResult(ctx context.Context, jobID string, result *VacuumExecutionResult) error {
jobResult := &plugin_pb.JobResult{
@@ -273,12 +243,12 @@ func (w *Worker) submitResult(ctx context.Context, jobID string, result *VacuumE
}
req := &plugin_pb.JobResultRequest{
JobId: jobID,
JobType: "vacuum_volume",
Status: plugin_pb.ExecutionStatus_EXECUTION_STATUS_COMPLETED,
Message: "Vacuum completed successfully",
Result: jobResult,
RetryCountUsed: 0,
JobId: jobID,
JobType: "vacuum_volume",
Status: plugin_pb.ExecutionStatus_EXECUTION_STATUS_COMPLETED,
Message: "Vacuum completed successfully",
Result: jobResult,
RetryCountUsed: 0,
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
@@ -314,19 +284,18 @@ func (w *Worker) GetStatus() map[string]interface{} {
// ParseFlags parses command line flags for vacuum worker
func ParseFlags() *WorkerConfig {
config := &WorkerConfig{
WorkerID: "vacuum-worker-1",
AdminHost: "localhost",
AdminPort: 50051,
PluginPort: 50053,
MinVolumeSize: 500,
MaxVolumeSize: 20000,
TargetUtilization: 80,
BatchSize: 5,
VacuumInterval: 6 * time.Hour,
MaxConcurrentJobs: 3,
HealthCheckInterval: 1 * time.Minute,
JobTimeout: 8 * time.Hour,
DeadSpaceThreshold: 30,
WorkerID: "vacuum-worker-1",
AdminHost: "localhost",
AdminPort: 50051,
PluginPort: 50053,
MinVolumeSize: 500,
MaxVolumeSize: 5000,
TargetUtilization: 80,
BatchSize: 10,
VacuumInterval: 4 * time.Hour,
MaxConcurrentJobs: 3,
HealthCheckInterval: 30 * time.Second,
DeadSpaceThreshold: 30,
}
flag.StringVar(&config.WorkerID, "worker-id", config.WorkerID, "Worker ID")
@@ -335,13 +304,12 @@ func ParseFlags() *WorkerConfig {
flag.IntVar(&config.PluginPort, "plugin-port", config.PluginPort, "Plugin server port")
flag.Uint64Var(&config.MinVolumeSize, "min-volume-size", config.MinVolumeSize, "Minimum volume size in MB")
flag.Uint64Var(&config.MaxVolumeSize, "max-volume-size", config.MaxVolumeSize, "Maximum volume size in MB")
flag.IntVar(&config.TargetUtilization, "target-utilization", config.TargetUtilization, "Target utilization percent")
flag.IntVar(&config.BatchSize, "batch-size", config.BatchSize, "Batch size for vacuum jobs")
flag.IntVar(&config.TargetUtilization, "target-utilization", config.TargetUtilization, "Target utilization percentage")
flag.IntVar(&config.BatchSize, "batch-size", config.BatchSize, "Batch size")
flag.DurationVar(&config.VacuumInterval, "vacuum-interval", config.VacuumInterval, "Vacuum interval")
flag.IntVar(&config.MaxConcurrentJobs, "max-concurrent-jobs", config.MaxConcurrentJobs, "Max concurrent jobs")
flag.DurationVar(&config.HealthCheckInterval, "health-check-interval", config.HealthCheckInterval, "Health check interval")
flag.DurationVar(&config.JobTimeout, "job-timeout", config.JobTimeout, "Job timeout")
flag.IntVar(&config.DeadSpaceThreshold, "dead-space-threshold", config.DeadSpaceThreshold, "Dead space threshold percent")
flag.IntVar(&config.DeadSpaceThreshold, "dead-space-threshold", config.DeadSpaceThreshold, "Dead space threshold percentage")
flag.Parse()
@@ -360,9 +328,3 @@ func (w *Worker) ListenAndServe(port int) error {
log.Printf("Worker listening on port %d", port)
return server.Serve(listener)
}
// GetActiveJobIDs returns a list of active job IDs
func (w *Worker) GetActiveJobIDs() []string {
ids := make([]string, 0, len(w.activeJobs))
for id := range w.activeJobs {
ids = append(ids, id)