Files
seaweedfs/weed/admin/plugin/workers/vacuum/executor.go
2026-02-17 02:00:36 -08:00

248 lines
5.9 KiB
Go

package vacuum
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
)
// ExecutionStatus tracks job execution status
type ExecutionStatus string
const (
StatusAnalyzing ExecutionStatus = "analyzing"
StatusDefragmenting ExecutionStatus = "defragmenting"
StatusOptimizing ExecutionStatus = "optimizing"
StatusVerifying ExecutionStatus = "verifying"
StatusCompleted ExecutionStatus = "completed"
StatusFailed ExecutionStatus = "failed"
)
// ExecutionStep represents a step in the vacuum pipeline
type ExecutionStep struct {
Name string
Status ExecutionStatus
StartTime *time.Time
EndTime *time.Time
Progress float32
ErrorMsg string
}
// Executor handles vacuum execution
type Executor struct {
config *ExecutorConfig
}
// ExecutorConfig contains executor configuration
type ExecutorConfig struct {
MinVolumeSize uint64
MaxVolumeSize uint64
TimeoutPerStep time.Duration
MaxRetries int
}
// NewExecutor creates a new vacuum executor
func NewExecutor(config *ExecutorConfig) *Executor {
if config == nil {
config = &ExecutorConfig{
MinVolumeSize: 500,
MaxVolumeSize: 5000,
TimeoutPerStep: 3 * time.Minute,
MaxRetries: 3,
}
}
return &Executor{config: config}
}
// VacuumExecutionResult contains the result of vacuum operation
type VacuumExecutionResult struct {
VolumeID uint32
Success bool
StartTime time.Time
EndTime time.Time
TotalDuration time.Duration
SpaceFreed uint64
FilesMoved int64
FragmentsBefore int64
FragmentsAfter int64
Metadata map[string]string
Steps []*ExecutionStep
ErrorMessage string
}
// ExecuteJob executes the vacuum operation for a volume
func (e *Executor) ExecuteJob(job *plugin_pb.ExecuteJobRequest) (*VacuumExecutionResult, error) {
result := &VacuumExecutionResult{
Success: false,
StartTime: time.Now(),
Metadata: make(map[string]string),
Steps: make([]*ExecutionStep, 0),
}
// Extract volume ID from payload
volumeID := extractVolumeID(job.Payload)
result.VolumeID = volumeID
// Step 1: Analyze fragmentation
if err := e.analyzeFragmentation(result); err != nil {
result.ErrorMessage = fmt.Sprintf("analysis failed: %v", err)
result.EndTime = time.Now()
result.TotalDuration = result.EndTime.Sub(result.StartTime)
return result, err
}
// Step 2: Defragment volume
if err := e.defragmentVolume(result); err != nil {
result.ErrorMessage = fmt.Sprintf("defragment failed: %v", err)
result.EndTime = time.Now()
result.TotalDuration = result.EndTime.Sub(result.StartTime)
return result, err
}
// Step 3: Optimize storage
if err := e.optimizeStorage(result); err != nil {
result.ErrorMessage = fmt.Sprintf("optimize failed: %v", err)
result.EndTime = time.Now()
result.TotalDuration = result.EndTime.Sub(result.StartTime)
return result, err
}
// Step 4: Verify result
if err := e.verifyResult(result); err != nil {
result.ErrorMessage = fmt.Sprintf("verification failed: %v", err)
result.EndTime = time.Now()
result.TotalDuration = result.EndTime.Sub(result.StartTime)
return result, err
}
result.Success = true
result.EndTime = time.Now()
result.TotalDuration = result.EndTime.Sub(result.StartTime)
return result, nil
}
// analyzeFragmentation analyzes the volume fragmentation
func (e *Executor) analyzeFragmentation(result *VacuumExecutionResult) error {
step := &ExecutionStep{
Name: "analyzing",
Status: StatusAnalyzing,
Progress: 0,
}
now := time.Now()
step.StartTime = &now
// Simulate analysis
time.Sleep(50 * time.Millisecond)
result.FragmentsBefore = 1500
step.Progress = 100
step.EndTime = &now
result.Steps = append(result.Steps, step)
return nil
}
// defragmentVolume defragments the volume
func (e *Executor) defragmentVolume(result *VacuumExecutionResult) error {
step := &ExecutionStep{
Name: "defragmenting",
Status: StatusDefragmenting,
Progress: 0,
}
now := time.Now()
step.StartTime = &now
// Simulate defragmentation with progress
for i := 0; i < 10; i++ {
time.Sleep(20 * time.Millisecond)
step.Progress = float32((i + 1) * 10)
}
result.FilesMoved = 850
result.SpaceFreed = 1000000
step.Progress = 100
step.EndTime = &now
result.Steps = append(result.Steps, step)
return nil
}
// optimizeStorage optimizes storage layout
func (e *Executor) optimizeStorage(result *VacuumExecutionResult) error {
step := &ExecutionStep{
Name: "optimizing",
Status: StatusOptimizing,
Progress: 0,
}
now := time.Now()
step.StartTime = &now
// Simulate optimization
for i := 0; i < 5; i++ {
time.Sleep(30 * time.Millisecond)
step.Progress = float32((i + 1) * 20)
}
step.Progress = 100
step.EndTime = &now
result.Steps = append(result.Steps, step)
return nil
}
// verifyResult verifies the operation result
func (e *Executor) verifyResult(result *VacuumExecutionResult) error {
step := &ExecutionStep{
Name: "verifying",
Status: StatusVerifying,
Progress: 0,
}
now := time.Now()
step.StartTime = &now
// Simulate verification
time.Sleep(50 * time.Millisecond)
result.FragmentsAfter = 320
result.Metadata["space_freed_mb"] = "1"
result.Metadata["files_moved"] = "850"
result.Metadata["fragmentation_reduction"] = "78.7%"
step.Progress = 100
step.EndTime = &now
result.Steps = append(result.Steps, step)
return nil
}
// extractVolumeID extracts the volume ID from job payload
func extractVolumeID(payload *plugin_pb.JobPayload) uint32 {
if payload == nil || len(payload.Data) < 4 {
return 0
}
return uint32(payload.Data[0]) |
(uint32(payload.Data[1]) << 8) |
(uint32(payload.Data[2]) << 16) |
(uint32(payload.Data[3]) << 24)
}
// ValidateExecutionResult validates the result of execution
func ValidateExecutionResult(result *VacuumExecutionResult) bool {
if !result.Success {
return false
}
if result.EndTime.Before(result.StartTime) {
return false
}
if len(result.Steps) != 4 {
return false
}
return true
}