diff --git a/weed/admin/plugin/workers/vacuum/detector.go b/weed/admin/plugin/workers/vacuum/detector.go index b43dcfb6f..c6c3db4a7 100644 --- a/weed/admin/plugin/workers/vacuum/detector.go +++ b/weed/admin/plugin/workers/vacuum/detector.go @@ -1,35 +1,30 @@ package vacuum import ( - "sort" + "fmt" ) // VacuumCandidate represents a volume eligible for vacuum type VacuumCandidate struct { - VolumeID uint32 - DataNodeID string - Size uint64 - UsedSpace uint64 - DeadSpace uint64 - DeadSpacePercent float64 - ReplicaCount int - RackID string - DataCenterID string - FileCount int64 - LastModified int64 - CanVacuum bool - FragmentationScore float64 - Reason string + VolumeID uint32 + DataNodeID string + Size uint64 + UsedSpace uint64 + DeadSpace uint64 + DeadSpacePercent float32 + FragmentationScore float32 + RackID string + CanVacuum bool + Reason string } // DetectionOptions contains options for detection type DetectionOptions struct { MinVolumeSize uint64 MaxVolumeSize uint64 - DeadSpaceThreshold int - TargetUtilization int - ExcludeNodes []string - PreferredNodes []string + DeadSpaceThreshold float32 + PreferredNodes []string + ExcludeNodes []string } // Detector scans for vacuum candidates @@ -55,79 +50,69 @@ func (d *Detector) DetectJobs(volumeMetrics map[uint32]*VolumeMetric) ([]*Vacuum } } - d.SortByFragmentation(candidates) + SortByFragmentation(candidates) return candidates, nil } // evaluateVolume checks if a volume should be vacuumed func (d *Detector) evaluateVolume(volumeID uint32, metric *VolumeMetric) (*VacuumCandidate, bool) { - deadSpace := metric.Size - metric.UsedSpace - deadSpacePercent := 0.0 - if metric.Size > 0 { - deadSpacePercent = float64(deadSpace) * 100.0 / float64(metric.Size) - } - candidate := &VacuumCandidate{ - VolumeID: volumeID, - DataNodeID: metric.DataNodeID, - Size: metric.Size, - UsedSpace: metric.UsedSpace, - DeadSpace: deadSpace, - DeadSpacePercent: deadSpacePercent, - ReplicaCount: metric.ReplicaCount, - RackID: metric.RackID, - DataCenterID: metric.DataCenterID, - FileCount: metric.FileCount, - LastModified: metric.LastModified, + VolumeID: volumeID, + DataNodeID: metric.DataNodeID, + Size: metric.Size, + UsedSpace: metric.UsedSpace, + RackID: metric.RackID, } + // Check size constraints if metric.Size < d.config.MinVolumeSize { candidate.CanVacuum = false - candidate.Reason = "volume too small" + candidate.Reason = fmt.Sprintf("volume too small: %d < %d", metric.Size, d.config.MinVolumeSize) return candidate, false } if metric.Size > d.config.MaxVolumeSize { candidate.CanVacuum = false - candidate.Reason = "volume too large" + candidate.Reason = fmt.Sprintf("volume too large: %d > %d", metric.Size, d.config.MaxVolumeSize) return candidate, false } - if int(deadSpacePercent) < d.config.DeadSpaceThreshold { + // Calculate dead space + deadSpace := metric.Size - metric.UsedSpace + deadSpacePercent := float32(deadSpace) * 100 / float32(metric.Size) + candidate.DeadSpace = deadSpace + candidate.DeadSpacePercent = deadSpacePercent + + // Check dead space threshold + if deadSpacePercent < d.config.DeadSpaceThreshold { candidate.CanVacuum = false - candidate.Reason = "insufficient dead space" + candidate.Reason = fmt.Sprintf("insufficient dead space: %.2f%% < %.2f%%", deadSpacePercent, d.config.DeadSpaceThreshold) return candidate, false } + // Check node exclusion if d.isNodeExcluded(metric.DataNodeID) { candidate.CanVacuum = false - candidate.Reason = "node excluded" + candidate.Reason = "node is in exclusion list" return candidate, false } + // Check node preference if len(d.config.PreferredNodes) > 0 && !d.isPreferredNode(metric.DataNodeID) { candidate.CanVacuum = false - candidate.Reason = "node not preferred" + candidate.Reason = "node not in preferred list" return candidate, false } - if metric.IsRebalancing { - candidate.CanVacuum = false - candidate.Reason = "volume rebalancing" - return candidate, false - } - - utilization := 0 - if metric.Size > 0 { - utilization = int(float64(metric.UsedSpace) * 100.0 / float64(metric.Size)) - } + // Calculate fragmentation score + candidate.FragmentationScore = d.calculateFragmentationScore(metric) candidate.CanVacuum = true - candidate.FragmentationScore = calculateFragmentationScore(deadSpacePercent, float64(utilization)) + candidate.Reason = "eligible for vacuum" return candidate, true } -// isNodeExcluded checks if a node is excluded +// isNodeExcluded checks if a node is in the exclusion list func (d *Detector) isNodeExcluded(nodeID string) bool { for _, excluded := range d.config.ExcludeNodes { if excluded == nodeID { @@ -137,10 +122,45 @@ func (d *Detector) isNodeExcluded(nodeID string) bool { return false } -// isPreferredNode checks if a node is preferred +// isPreferredNode checks if a node is in the preferred list func (d *Detector) isPreferredNode(nodeID string) bool { for _, preferred := range d.config.PreferredNodes { if preferred == nodeID { return true } } + return false +} + +// calculateFragmentationScore calculates how fragmented a volume is +func (d *Detector) calculateFragmentationScore(metric *VolumeMetric) float32 { + if metric.Size == 0 { + return 0 + } + deadSpace := metric.Size - metric.UsedSpace + return float32(deadSpace) * 100 / float32(metric.Size) +} + +// VolumeMetric contains volume statistics +type VolumeMetric struct { + VolumeID uint32 + DataNodeID string + Size uint64 + UsedSpace uint64 + FileCount int64 + LastVacuumTime int64 + RackID string + Collection string +} + +// SortByFragmentation sorts candidates by fragmentation score +func SortByFragmentation(candidates []*VacuumCandidate) { + // Simple bubble sort for demonstration + for i := 0; i < len(candidates); i++ { + for j := i + 1; j < len(candidates); j++ { + if candidates[j].FragmentationScore > candidates[i].FragmentationScore { + candidates[i], candidates[j] = candidates[j], candidates[i] + } + } + } +} diff --git a/weed/admin/plugin/workers/vacuum/executor.go b/weed/admin/plugin/workers/vacuum/executor.go index 97956dffe..1ac635415 100644 --- a/weed/admin/plugin/workers/vacuum/executor.go +++ b/weed/admin/plugin/workers/vacuum/executor.go @@ -11,22 +11,22 @@ import ( type ExecutionStatus string const ( - StatusAnalyzing ExecutionStatus = "analyzing" - StatusDefragment ExecutionStatus = "defragmenting" - StatusOptimizing ExecutionStatus = "optimizing" - StatusVerifying ExecutionStatus = "verifying" - StatusCompleted ExecutionStatus = "completed" - StatusFailed ExecutionStatus = "failed" + StatusAnalyzing ExecutionStatus = "analyzing" + StatusDefragmenting ExecutionStatus = "defragmenting" + StatusOptimizing ExecutionStatus = "optimizing" + StatusVerifying ExecutionStatus = "verifying" + StatusCompleted ExecutionStatus = "completed" + StatusFailed ExecutionStatus = "failed" ) // ExecutionStep represents a step in the vacuum pipeline type ExecutionStep struct { - Name string - Status ExecutionStatus - StartTime *time.Time - EndTime *time.Time - Progress float32 - ErrorMsg string + Name string + Status ExecutionStatus + StartTime *time.Time + EndTime *time.Time + Progress float32 + ErrorMsg string } // Executor handles vacuum execution @@ -36,22 +36,20 @@ type Executor struct { // ExecutorConfig contains executor configuration type ExecutorConfig struct { - MinVolumeSize uint64 - MaxVolumeSize uint64 - TargetUtilization int - TimeoutPerStep time.Duration - MaxRetries int + MinVolumeSize uint64 + MaxVolumeSize uint64 + TimeoutPerStep time.Duration + MaxRetries int } // NewExecutor creates a new vacuum executor func NewExecutor(config *ExecutorConfig) *Executor { if config == nil { config = &ExecutorConfig{ - MinVolumeSize: 500, - MaxVolumeSize: 20000, - TargetUtilization: 80, - TimeoutPerStep: 2 * time.Hour, - MaxRetries: 2, + MinVolumeSize: 500, + MaxVolumeSize: 5000, + TimeoutPerStep: 3 * time.Minute, + MaxRetries: 3, } } return &Executor{config: config} @@ -59,32 +57,34 @@ func NewExecutor(config *ExecutorConfig) *Executor { // VacuumExecutionResult contains the result of vacuum operation type VacuumExecutionResult struct { - VolumeID uint32 - Success bool - StartTime time.Time - EndTime time.Time - TotalDuration time.Duration - BytesProcessed uint64 - BytesFreed uint64 - FragmentationBefore float64 - FragmentationAfter float64 - Metadata map[string]string - Steps []*ExecutionStep - ErrorMessage string + VolumeID uint32 + Success bool + StartTime time.Time + EndTime time.Time + TotalDuration time.Duration + SpaceFreed uint64 + FilesMoved int64 + FragmentsBefore int64 + FragmentsAfter int64 + Metadata map[string]string + Steps []*ExecutionStep + ErrorMessage string } // ExecuteJob executes the vacuum operation for a volume func (e *Executor) ExecuteJob(job *plugin_pb.ExecuteJobRequest) (*VacuumExecutionResult, error) { result := &VacuumExecutionResult{ - Success: false, - StartTime: time.Now(), - Metadata: make(map[string]string), - Steps: make([]*ExecutionStep, 0), + Success: false, + StartTime: time.Now(), + Metadata: make(map[string]string), + Steps: make([]*ExecutionStep, 0), } + // Extract volume ID from payload volumeID := extractVolumeID(job.Payload) result.VolumeID = volumeID + // Step 1: Analyze fragmentation if err := e.analyzeFragmentation(result); err != nil { result.ErrorMessage = fmt.Sprintf("analysis failed: %v", err) result.EndTime = time.Now() @@ -92,20 +92,23 @@ func (e *Executor) ExecuteJob(job *plugin_pb.ExecuteJobRequest) (*VacuumExecutio return result, err } + // Step 2: Defragment volume if err := e.defragmentVolume(result); err != nil { - result.ErrorMessage = fmt.Sprintf("defragmentation failed: %v", err) + result.ErrorMessage = fmt.Sprintf("defragment failed: %v", err) result.EndTime = time.Now() result.TotalDuration = result.EndTime.Sub(result.StartTime) return result, err } + // Step 3: Optimize storage if err := e.optimizeStorage(result); err != nil { - result.ErrorMessage = fmt.Sprintf("optimization failed: %v", err) + result.ErrorMessage = fmt.Sprintf("optimize failed: %v", err) result.EndTime = time.Now() result.TotalDuration = result.EndTime.Sub(result.StartTime) return result, err } + // Step 4: Verify result if err := e.verifyResult(result); err != nil { result.ErrorMessage = fmt.Sprintf("verification failed: %v", err) result.EndTime = time.Now() @@ -116,10 +119,11 @@ func (e *Executor) ExecuteJob(job *plugin_pb.ExecuteJobRequest) (*VacuumExecutio result.Success = true result.EndTime = time.Now() result.TotalDuration = result.EndTime.Sub(result.StartTime) + return result, nil } -// analyzeFragmentation analyzes volume fragmentation +// analyzeFragmentation analyzes the volume fragmentation func (e *Executor) analyzeFragmentation(result *VacuumExecutionResult) error { step := &ExecutionStep{ Name: "analyzing", @@ -129,50 +133,44 @@ func (e *Executor) analyzeFragmentation(result *VacuumExecutionResult) error { now := time.Now() step.StartTime = &now - for i := 0; i < 5; i++ { - time.Sleep(20 * time.Millisecond) - step.Progress = float32((i + 1) * 20) - } - - result.FragmentationBefore = 35.5 - result.Metadata["fragmentation_before"] = fmt.Sprintf("%.1f%%", result.FragmentationBefore) + // Simulate analysis + time.Sleep(50 * time.Millisecond) + result.FragmentsBefore = 1500 step.Progress = 100 - stepEnd := time.Now() - step.EndTime = &stepEnd + step.EndTime = &now result.Steps = append(result.Steps, step) + return nil } -// defragmentVolume performs the actual defragmentation +// defragmentVolume defragments the volume func (e *Executor) defragmentVolume(result *VacuumExecutionResult) error { step := &ExecutionStep{ Name: "defragmenting", - Status: StatusDefragment, + Status: StatusDefragmenting, Progress: 0, } now := time.Now() step.StartTime = &now - chunks := 10 - for i := 0; i < chunks; i++ { - time.Sleep(50 * time.Millisecond) - step.Progress = float32((i + 1) * 100 / chunks) + // Simulate defragmentation with progress + for i := 0; i < 10; i++ { + time.Sleep(20 * time.Millisecond) + step.Progress = float32((i + 1) * 10) } - result.BytesProcessed = 5000000 - result.BytesFreed = 1500000 - result.Metadata["bytes_processed"] = fmt.Sprintf("%d", result.BytesProcessed) - result.Metadata["bytes_freed"] = fmt.Sprintf("%d", result.BytesFreed) + result.FilesMoved = 850 + result.SpaceFreed = 1000000 step.Progress = 100 - stepEnd := time.Now() - step.EndTime = &stepEnd + step.EndTime = &now result.Steps = append(result.Steps, step) + return nil } -// optimizeStorage optimizes the storage layout +// optimizeStorage optimizes storage layout func (e *Executor) optimizeStorage(result *VacuumExecutionResult) error { step := &ExecutionStep{ Name: "optimizing", @@ -182,5 +180,68 @@ func (e *Executor) optimizeStorage(result *VacuumExecutionResult) error { now := time.Now() step.StartTime = &now - for i := 0; i < 8; i++ { + // Simulate optimization + for i := 0; i < 5; i++ { time.Sleep(30 * time.Millisecond) + step.Progress = float32((i + 1) * 20) + } + + step.Progress = 100 + step.EndTime = &now + result.Steps = append(result.Steps, step) + + return nil +} + +// verifyResult verifies the operation result +func (e *Executor) verifyResult(result *VacuumExecutionResult) error { + step := &ExecutionStep{ + Name: "verifying", + Status: StatusVerifying, + Progress: 0, + } + now := time.Now() + step.StartTime = &now + + // Simulate verification + time.Sleep(50 * time.Millisecond) + + result.FragmentsAfter = 320 + result.Metadata["space_freed_mb"] = "1" + result.Metadata["files_moved"] = "850" + result.Metadata["fragmentation_reduction"] = "78.7%" + + step.Progress = 100 + step.EndTime = &now + result.Steps = append(result.Steps, step) + + return nil +} + +// extractVolumeID extracts the volume ID from job payload +func extractVolumeID(payload *plugin_pb.JobPayload) uint32 { + if payload == nil || len(payload.Data) < 4 { + return 0 + } + return uint32(payload.Data[0]) | + (uint32(payload.Data[1]) << 8) | + (uint32(payload.Data[2]) << 16) | + (uint32(payload.Data[3]) << 24) +} + +// ValidateExecutionResult validates the result of execution +func ValidateExecutionResult(result *VacuumExecutionResult) bool { + if !result.Success { + return false + } + + if result.EndTime.Before(result.StartTime) { + return false + } + + if len(result.Steps) != 4 { + return false + } + + return true +} diff --git a/weed/admin/plugin/workers/vacuum/schema.go b/weed/admin/plugin/workers/vacuum/schema.go index c2ec0bd70..112dc64a8 100644 --- a/weed/admin/plugin/workers/vacuum/schema.go +++ b/weed/admin/plugin/workers/vacuum/schema.go @@ -14,19 +14,19 @@ type ConfigurationSchema struct { // AdminConfigSchema defines admin-side configuration type AdminConfigSchema struct { - VacuumInterval ConfigField `json:"vacuum_interval"` - MaxConcurrentJobs ConfigField `json:"max_concurrent_jobs"` - JobTimeout ConfigField `json:"job_timeout"` - HealthCheckInterval ConfigField `json:"health_check_interval"` - DeadSpaceThreshold ConfigField `json:"dead_space_threshold"` + VacuumInterval ConfigField `json:"vacuum_interval"` + MaxConcurrentJobs ConfigField `json:"max_concurrent_jobs"` + JobTimeout ConfigField `json:"job_timeout"` + HealthCheckInterval ConfigField `json:"health_check_interval"` + DeadSpaceThreshold ConfigField `json:"dead_space_threshold"` } // WorkerConfigSchema defines worker-side configuration type WorkerConfigSchema struct { - MinVolumeSize ConfigField `json:"min_volume_size"` - MaxVolumeSize ConfigField `json:"max_volume_size"` - TargetUtilization ConfigField `json:"target_utilization"` - BatchSize ConfigField `json:"batch_size"` + MinVolumeSize ConfigField `json:"min_volume_size"` + MaxVolumeSize ConfigField `json:"max_volume_size"` + TargetUtilization ConfigField `json:"target_utilization"` + BatchSize ConfigField `json:"batch_size"` } // ConfigField describes a configuration field @@ -48,10 +48,10 @@ func GetConfigurationSchema() *plugin_pb.PluginConfig { AdminConfig: AdminConfigSchema{ VacuumInterval: ConfigField{ Name: "vacuum_interval", - Description: "Time between vacuum detection scans", + Description: "Time between vacuum operations", Type: "duration", Required: true, - Default: "6h", + Default: "4h", Min: "1h", Max: "24h", Unit: "seconds", @@ -72,7 +72,7 @@ func GetConfigurationSchema() *plugin_pb.PluginConfig { Required: true, Default: "8h", Min: "1h", - Max: "48h", + Max: "24h", Unit: "seconds", }, HealthCheckInterval: ConfigField{ @@ -80,19 +80,19 @@ func GetConfigurationSchema() *plugin_pb.PluginConfig { Description: "Health check interval", Type: "duration", Required: true, - Default: "1m", - Min: "10s", - Max: "10m", + Default: "30s", + Min: "5s", + Max: "5m", Unit: "seconds", }, DeadSpaceThreshold: ConfigField{ Name: "dead_space_threshold", - Description: "Percentage of dead space to trigger vacuum", + Description: "Dead space percentage threshold for vacuum", Type: "integer", Required: true, Default: 30, Min: 5, - Max: 90, + Max: 95, Unit: "percent", }, }, @@ -103,7 +103,7 @@ func GetConfigurationSchema() *plugin_pb.PluginConfig { Type: "integer", Required: true, Default: 500, - Min: 50, + Min: 100, Unit: "MB", }, MaxVolumeSize: ConfigField{ @@ -111,13 +111,13 @@ func GetConfigurationSchema() *plugin_pb.PluginConfig { Description: "Maximum volume size to consider for vacuum", Type: "integer", Required: true, - Default: 20000, - Max: 100000, + Default: 5000, + Max: 50000, Unit: "MB", }, TargetUtilization: ConfigField{ Name: "target_utilization", - Description: "Target storage utilization after vacuum", + Description: "Target volume utilization after vacuum", Type: "integer", Required: true, Default: 80, @@ -129,3 +129,50 @@ func GetConfigurationSchema() *plugin_pb.PluginConfig { Name: "batch_size", Description: "Number of volumes to process in a batch", Type: "integer", + Required: true, + Default: 10, + Min: 1, + Max: 100, + }, + }, + } + + data, _ := json.MarshalIndent(schema, "", " ") + + return &plugin_pb.PluginConfig{ + PluginId: "vacuum-plugin", + Properties: map[string]string{ + "schema": string(data), + "vacuum_interval": "4h", + "max_concurrent_jobs": "3", + "job_timeout": "8h", + "health_check_interval": "30s", + "dead_space_threshold": "30", + "min_volume_size": "500", + "max_volume_size": "5000", + "target_utilization": "80", + "batch_size": "10", + }, + } +} + +// DefaultAdminConfig returns default admin configuration +func DefaultAdminConfig() map[string]string { + return map[string]string{ + "vacuum_interval": "4h", + "max_concurrent_jobs": "3", + "job_timeout": "8h", + "health_check_interval": "30s", + "dead_space_threshold": "30", + } +} + +// DefaultWorkerConfig returns default worker configuration +func DefaultWorkerConfig() map[string]string { + return map[string]string{ + "min_volume_size": "500", + "max_volume_size": "5000", + "target_utilization": "80", + "batch_size": "10", + } +} diff --git a/weed/admin/plugin/workers/vacuum/worker.go b/weed/admin/plugin/workers/vacuum/worker.go index 84446df69..0987afffa 100644 --- a/weed/admin/plugin/workers/vacuum/worker.go +++ b/weed/admin/plugin/workers/vacuum/worker.go @@ -15,19 +15,18 @@ import ( // WorkerConfig holds worker-specific configuration type WorkerConfig struct { - WorkerID string - AdminHost string - AdminPort int - PluginPort int - MinVolumeSize uint64 - MaxVolumeSize uint64 - TargetUtilization int - BatchSize int - VacuumInterval time.Duration - MaxConcurrentJobs int - HealthCheckInterval time.Duration - JobTimeout time.Duration - DeadSpaceThreshold int + WorkerID string + AdminHost string + AdminPort int + PluginPort int + MinVolumeSize uint64 + MaxVolumeSize uint64 + TargetUtilization int + BatchSize int + VacuumInterval time.Duration + MaxConcurrentJobs int + HealthCheckInterval time.Duration + DeadSpaceThreshold int } // Worker represents the vacuum plugin worker @@ -55,31 +54,34 @@ func NewWorker(config *WorkerConfig) *Worker { func (w *Worker) Start(ctx context.Context) error { log.Printf("Starting vacuum worker: %s", w.config.WorkerID) + // Connect to admin server if err := w.connectToAdmin(ctx); err != nil { return fmt.Errorf("failed to connect to admin: %v", err) } + // Initialize detector w.detector = NewDetector(DetectionOptions{ MinVolumeSize: w.config.MinVolumeSize, MaxVolumeSize: w.config.MaxVolumeSize, - DeadSpaceThreshold: w.config.DeadSpaceThreshold, - TargetUtilization: w.config.TargetUtilization, + DeadSpaceThreshold: float32(w.config.DeadSpaceThreshold), }) + // Initialize executor w.executor = NewExecutor(&ExecutorConfig{ - MinVolumeSize: w.config.MinVolumeSize, - MaxVolumeSize: w.config.MaxVolumeSize, - TargetUtilization: w.config.TargetUtilization, - TimeoutPerStep: w.config.JobTimeout / 4, - MaxRetries: 2, + MinVolumeSize: w.config.MinVolumeSize, + MaxVolumeSize: w.config.MaxVolumeSize, + TimeoutPerStep: 3 * time.Minute, + MaxRetries: 3, }) + // Register with admin if err := w.registerPlugin(ctx); err != nil { return fmt.Errorf("failed to register: %v", err) } w.isRunning = true + // Start background goroutines go w.heartbeatLoop(ctx) log.Printf("Vacuum worker started successfully") @@ -100,6 +102,7 @@ func (w *Worker) connectToAdmin(ctx context.Context) error { w.conn = conn w.pluginClient = plugin_pb.NewPluginServiceClient(conn) + return nil } @@ -117,25 +120,27 @@ func (w *Worker) registerPlugin(ctx context.Context) error { Port: int32(w.config.PluginPort), } + // Add capabilities detail req.CapabilitiesDetail = &plugin_pb.PluginCapabilities{ Detection: []*plugin_pb.DetectionCapability{ { Type: "vacuum_candidates", - Description: "Detect volumes eligible for vacuuming", + Description: "Detect volumes eligible for vacuum", MinIntervalSeconds: int32(w.config.VacuumInterval.Seconds()), - RequiresFullScan: true, + RequiresFullScan: false, }, }, Maintenance: []*plugin_pb.MaintenanceCapability{ { - Type: "vacuum_volume", - Description: "Vacuum and defragment a volume", - RequiredDetectionTypes: []string{"vacuum_candidates"}, - EstimatedDurationSeconds: int32(w.config.JobTimeout.Seconds()), + Type: "vacuum_volume", + Description: "Vacuum a volume to free dead space", + RequiredDetectionTypes: []string{"vacuum_candidates"}, + EstimatedDurationSeconds: 1800, }, }, } + // Add schema to metadata if schema != nil { if req.Metadata == nil { req.Metadata = make(map[string]string) @@ -198,16 +203,7 @@ func (w *Worker) sendHealthReport(ctx context.Context) { // ExecuteDetection performs detection for vacuum candidates func (w *Worker) ExecuteDetection(ctx context.Context, volumeMetrics map[uint32]*VolumeMetric) ([]*VacuumCandidate, error) { - candidates, err := w.detector.DetectJobs(volumeMetrics) - if err != nil { - return nil, err - } - - if len(candidates) > w.config.BatchSize { - candidates = candidates[:w.config.BatchSize] - } - - return candidates, nil + return w.detector.DetectJobs(volumeMetrics) } // ExecuteJob executes a vacuum job @@ -223,10 +219,8 @@ func (w *Worker) ExecuteJob(ctx context.Context, jobID string, payload *plugin_p defer delete(w.activeJobs, jobID) - jobCtx, cancel := context.WithTimeout(ctx, w.config.JobTimeout) - defer cancel() - - result, err := w.executeJobWithContext(jobCtx, req) + // Execute the job + result, err := w.executor.ExecuteJob(req) if err != nil { log.Printf("Job execution failed: %v", err) return err @@ -241,30 +235,6 @@ func (w *Worker) ExecuteJob(ctx context.Context, jobID string, payload *plugin_p return fmt.Errorf(result.ErrorMessage) } -// executeJobWithContext executes a job with context -func (w *Worker) executeJobWithContext(ctx context.Context, req *plugin_pb.ExecuteJobRequest) (*VacuumExecutionResult, error) { - done := make(chan *VacuumExecutionResult, 1) - errChan := make(chan error, 1) - - go func() { - result, err := w.executor.ExecuteJob(req) - if err != nil { - errChan <- err - } else { - done <- result - } - }() - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case result := <-done: - return result, nil - case err := <-errChan: - return nil, err - } -} - // submitResult submits job results to admin func (w *Worker) submitResult(ctx context.Context, jobID string, result *VacuumExecutionResult) error { jobResult := &plugin_pb.JobResult{ @@ -273,12 +243,12 @@ func (w *Worker) submitResult(ctx context.Context, jobID string, result *VacuumE } req := &plugin_pb.JobResultRequest{ - JobId: jobID, - JobType: "vacuum_volume", - Status: plugin_pb.ExecutionStatus_EXECUTION_STATUS_COMPLETED, - Message: "Vacuum completed successfully", - Result: jobResult, - RetryCountUsed: 0, + JobId: jobID, + JobType: "vacuum_volume", + Status: plugin_pb.ExecutionStatus_EXECUTION_STATUS_COMPLETED, + Message: "Vacuum completed successfully", + Result: jobResult, + RetryCountUsed: 0, } ctx, cancel := context.WithTimeout(ctx, 10*time.Second) @@ -314,19 +284,18 @@ func (w *Worker) GetStatus() map[string]interface{} { // ParseFlags parses command line flags for vacuum worker func ParseFlags() *WorkerConfig { config := &WorkerConfig{ - WorkerID: "vacuum-worker-1", - AdminHost: "localhost", - AdminPort: 50051, - PluginPort: 50053, - MinVolumeSize: 500, - MaxVolumeSize: 20000, - TargetUtilization: 80, - BatchSize: 5, - VacuumInterval: 6 * time.Hour, - MaxConcurrentJobs: 3, - HealthCheckInterval: 1 * time.Minute, - JobTimeout: 8 * time.Hour, - DeadSpaceThreshold: 30, + WorkerID: "vacuum-worker-1", + AdminHost: "localhost", + AdminPort: 50051, + PluginPort: 50053, + MinVolumeSize: 500, + MaxVolumeSize: 5000, + TargetUtilization: 80, + BatchSize: 10, + VacuumInterval: 4 * time.Hour, + MaxConcurrentJobs: 3, + HealthCheckInterval: 30 * time.Second, + DeadSpaceThreshold: 30, } flag.StringVar(&config.WorkerID, "worker-id", config.WorkerID, "Worker ID") @@ -335,13 +304,12 @@ func ParseFlags() *WorkerConfig { flag.IntVar(&config.PluginPort, "plugin-port", config.PluginPort, "Plugin server port") flag.Uint64Var(&config.MinVolumeSize, "min-volume-size", config.MinVolumeSize, "Minimum volume size in MB") flag.Uint64Var(&config.MaxVolumeSize, "max-volume-size", config.MaxVolumeSize, "Maximum volume size in MB") - flag.IntVar(&config.TargetUtilization, "target-utilization", config.TargetUtilization, "Target utilization percent") - flag.IntVar(&config.BatchSize, "batch-size", config.BatchSize, "Batch size for vacuum jobs") + flag.IntVar(&config.TargetUtilization, "target-utilization", config.TargetUtilization, "Target utilization percentage") + flag.IntVar(&config.BatchSize, "batch-size", config.BatchSize, "Batch size") flag.DurationVar(&config.VacuumInterval, "vacuum-interval", config.VacuumInterval, "Vacuum interval") flag.IntVar(&config.MaxConcurrentJobs, "max-concurrent-jobs", config.MaxConcurrentJobs, "Max concurrent jobs") flag.DurationVar(&config.HealthCheckInterval, "health-check-interval", config.HealthCheckInterval, "Health check interval") - flag.DurationVar(&config.JobTimeout, "job-timeout", config.JobTimeout, "Job timeout") - flag.IntVar(&config.DeadSpaceThreshold, "dead-space-threshold", config.DeadSpaceThreshold, "Dead space threshold percent") + flag.IntVar(&config.DeadSpaceThreshold, "dead-space-threshold", config.DeadSpaceThreshold, "Dead space threshold percentage") flag.Parse() @@ -360,9 +328,3 @@ func (w *Worker) ListenAndServe(port int) error { log.Printf("Worker listening on port %d", port) return server.Serve(listener) } - -// GetActiveJobIDs returns a list of active job IDs -func (w *Worker) GetActiveJobIDs() []string { - ids := make([]string, 0, len(w.activeJobs)) - for id := range w.activeJobs { - ids = append(ids, id)