package vacuum import ( "fmt" "time" "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" ) // ExecutionStatus tracks job execution status type ExecutionStatus string const ( StatusAnalyzing ExecutionStatus = "analyzing" StatusDefragmenting ExecutionStatus = "defragmenting" StatusOptimizing ExecutionStatus = "optimizing" StatusVerifying ExecutionStatus = "verifying" StatusCompleted ExecutionStatus = "completed" StatusFailed ExecutionStatus = "failed" ) // ExecutionStep represents a step in the vacuum pipeline type ExecutionStep struct { Name string Status ExecutionStatus StartTime *time.Time EndTime *time.Time Progress float32 ErrorMsg string } // Executor handles vacuum execution type Executor struct { config *ExecutorConfig } // ExecutorConfig contains executor configuration type ExecutorConfig struct { MinVolumeSize uint64 MaxVolumeSize uint64 TimeoutPerStep time.Duration MaxRetries int } // NewExecutor creates a new vacuum executor func NewExecutor(config *ExecutorConfig) *Executor { if config == nil { config = &ExecutorConfig{ MinVolumeSize: 500, MaxVolumeSize: 5000, TimeoutPerStep: 3 * time.Minute, MaxRetries: 3, } } return &Executor{config: config} } // VacuumExecutionResult contains the result of vacuum operation type VacuumExecutionResult struct { VolumeID uint32 Success bool StartTime time.Time EndTime time.Time TotalDuration time.Duration SpaceFreed uint64 FilesMoved int64 FragmentsBefore int64 FragmentsAfter int64 Metadata map[string]string Steps []*ExecutionStep ErrorMessage string } // ExecuteJob executes the vacuum operation for a volume func (e *Executor) ExecuteJob(job *plugin_pb.ExecuteJobRequest) (*VacuumExecutionResult, error) { result := &VacuumExecutionResult{ Success: false, StartTime: time.Now(), Metadata: make(map[string]string), Steps: make([]*ExecutionStep, 0), } // Extract volume ID from payload volumeID := extractVolumeID(job.Payload) result.VolumeID = volumeID // Step 1: Analyze fragmentation if err := e.analyzeFragmentation(result); err != nil { result.ErrorMessage = fmt.Sprintf("analysis failed: %v", err) result.EndTime = time.Now() result.TotalDuration = result.EndTime.Sub(result.StartTime) return result, err } // Step 2: Defragment volume if err := e.defragmentVolume(result); err != nil { result.ErrorMessage = fmt.Sprintf("defragment failed: %v", err) result.EndTime = time.Now() result.TotalDuration = result.EndTime.Sub(result.StartTime) return result, err } // Step 3: Optimize storage if err := e.optimizeStorage(result); err != nil { result.ErrorMessage = fmt.Sprintf("optimize failed: %v", err) result.EndTime = time.Now() result.TotalDuration = result.EndTime.Sub(result.StartTime) return result, err } // Step 4: Verify result if err := e.verifyResult(result); err != nil { result.ErrorMessage = fmt.Sprintf("verification failed: %v", err) result.EndTime = time.Now() result.TotalDuration = result.EndTime.Sub(result.StartTime) return result, err } result.Success = true result.EndTime = time.Now() result.TotalDuration = result.EndTime.Sub(result.StartTime) return result, nil } // analyzeFragmentation analyzes the volume fragmentation func (e *Executor) analyzeFragmentation(result *VacuumExecutionResult) error { step := &ExecutionStep{ Name: "analyzing", Status: StatusAnalyzing, Progress: 0, } now := time.Now() step.StartTime = &now // Simulate analysis time.Sleep(50 * time.Millisecond) result.FragmentsBefore = 1500 step.Progress = 100 step.EndTime = &now result.Steps = append(result.Steps, step) return nil } // defragmentVolume defragments the volume func (e *Executor) defragmentVolume(result *VacuumExecutionResult) error { step := &ExecutionStep{ Name: "defragmenting", Status: StatusDefragmenting, Progress: 0, } now := time.Now() step.StartTime = &now // Simulate defragmentation with progress for i := 0; i < 10; i++ { time.Sleep(20 * time.Millisecond) step.Progress = float32((i + 1) * 10) } result.FilesMoved = 850 result.SpaceFreed = 1000000 step.Progress = 100 step.EndTime = &now result.Steps = append(result.Steps, step) return nil } // optimizeStorage optimizes storage layout func (e *Executor) optimizeStorage(result *VacuumExecutionResult) error { step := &ExecutionStep{ Name: "optimizing", Status: StatusOptimizing, Progress: 0, } now := time.Now() step.StartTime = &now // Simulate optimization for i := 0; i < 5; i++ { time.Sleep(30 * time.Millisecond) step.Progress = float32((i + 1) * 20) } step.Progress = 100 step.EndTime = &now result.Steps = append(result.Steps, step) return nil } // verifyResult verifies the operation result func (e *Executor) verifyResult(result *VacuumExecutionResult) error { step := &ExecutionStep{ Name: "verifying", Status: StatusVerifying, Progress: 0, } now := time.Now() step.StartTime = &now // Simulate verification time.Sleep(50 * time.Millisecond) result.FragmentsAfter = 320 result.Metadata["space_freed_mb"] = "1" result.Metadata["files_moved"] = "850" result.Metadata["fragmentation_reduction"] = "78.7%" step.Progress = 100 step.EndTime = &now result.Steps = append(result.Steps, step) return nil } // extractVolumeID extracts the volume ID from job payload func extractVolumeID(payload *plugin_pb.JobPayload) uint32 { if payload == nil || len(payload.Data) < 4 { return 0 } return uint32(payload.Data[0]) | (uint32(payload.Data[1]) << 8) | (uint32(payload.Data[2]) << 16) | (uint32(payload.Data[3]) << 24) } // ValidateExecutionResult validates the result of execution func ValidateExecutionResult(result *VacuumExecutionResult) bool { if !result.Success { return false } if result.EndTime.Before(result.StartTime) { return false } if len(result.Steps) != 4 { return false } return true }