mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-17 07:11:30 +00:00
248 lines
5.9 KiB
Go
248 lines
5.9 KiB
Go
package vacuum
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
|
)
|
|
|
|
// ExecutionStatus tracks job execution status
|
|
type ExecutionStatus string
|
|
|
|
const (
|
|
StatusAnalyzing ExecutionStatus = "analyzing"
|
|
StatusDefragmenting ExecutionStatus = "defragmenting"
|
|
StatusOptimizing ExecutionStatus = "optimizing"
|
|
StatusVerifying ExecutionStatus = "verifying"
|
|
StatusCompleted ExecutionStatus = "completed"
|
|
StatusFailed ExecutionStatus = "failed"
|
|
)
|
|
|
|
// ExecutionStep represents a step in the vacuum pipeline
|
|
type ExecutionStep struct {
|
|
Name string
|
|
Status ExecutionStatus
|
|
StartTime *time.Time
|
|
EndTime *time.Time
|
|
Progress float32
|
|
ErrorMsg string
|
|
}
|
|
|
|
// Executor handles vacuum execution
|
|
type Executor struct {
|
|
config *ExecutorConfig
|
|
}
|
|
|
|
// ExecutorConfig contains executor configuration
|
|
type ExecutorConfig struct {
|
|
MinVolumeSize uint64
|
|
MaxVolumeSize uint64
|
|
TimeoutPerStep time.Duration
|
|
MaxRetries int
|
|
}
|
|
|
|
// NewExecutor creates a new vacuum executor
|
|
func NewExecutor(config *ExecutorConfig) *Executor {
|
|
if config == nil {
|
|
config = &ExecutorConfig{
|
|
MinVolumeSize: 500,
|
|
MaxVolumeSize: 5000,
|
|
TimeoutPerStep: 3 * time.Minute,
|
|
MaxRetries: 3,
|
|
}
|
|
}
|
|
return &Executor{config: config}
|
|
}
|
|
|
|
// VacuumExecutionResult contains the result of vacuum operation
|
|
type VacuumExecutionResult struct {
|
|
VolumeID uint32
|
|
Success bool
|
|
StartTime time.Time
|
|
EndTime time.Time
|
|
TotalDuration time.Duration
|
|
SpaceFreed uint64
|
|
FilesMoved int64
|
|
FragmentsBefore int64
|
|
FragmentsAfter int64
|
|
Metadata map[string]string
|
|
Steps []*ExecutionStep
|
|
ErrorMessage string
|
|
}
|
|
|
|
// ExecuteJob executes the vacuum operation for a volume
|
|
func (e *Executor) ExecuteJob(job *plugin_pb.ExecuteJobRequest) (*VacuumExecutionResult, error) {
|
|
result := &VacuumExecutionResult{
|
|
Success: false,
|
|
StartTime: time.Now(),
|
|
Metadata: make(map[string]string),
|
|
Steps: make([]*ExecutionStep, 0),
|
|
}
|
|
|
|
// Extract volume ID from payload
|
|
volumeID := extractVolumeID(job.Payload)
|
|
result.VolumeID = volumeID
|
|
|
|
// Step 1: Analyze fragmentation
|
|
if err := e.analyzeFragmentation(result); err != nil {
|
|
result.ErrorMessage = fmt.Sprintf("analysis failed: %v", err)
|
|
result.EndTime = time.Now()
|
|
result.TotalDuration = result.EndTime.Sub(result.StartTime)
|
|
return result, err
|
|
}
|
|
|
|
// Step 2: Defragment volume
|
|
if err := e.defragmentVolume(result); err != nil {
|
|
result.ErrorMessage = fmt.Sprintf("defragment failed: %v", err)
|
|
result.EndTime = time.Now()
|
|
result.TotalDuration = result.EndTime.Sub(result.StartTime)
|
|
return result, err
|
|
}
|
|
|
|
// Step 3: Optimize storage
|
|
if err := e.optimizeStorage(result); err != nil {
|
|
result.ErrorMessage = fmt.Sprintf("optimize failed: %v", err)
|
|
result.EndTime = time.Now()
|
|
result.TotalDuration = result.EndTime.Sub(result.StartTime)
|
|
return result, err
|
|
}
|
|
|
|
// Step 4: Verify result
|
|
if err := e.verifyResult(result); err != nil {
|
|
result.ErrorMessage = fmt.Sprintf("verification failed: %v", err)
|
|
result.EndTime = time.Now()
|
|
result.TotalDuration = result.EndTime.Sub(result.StartTime)
|
|
return result, err
|
|
}
|
|
|
|
result.Success = true
|
|
result.EndTime = time.Now()
|
|
result.TotalDuration = result.EndTime.Sub(result.StartTime)
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// analyzeFragmentation analyzes the volume fragmentation
|
|
func (e *Executor) analyzeFragmentation(result *VacuumExecutionResult) error {
|
|
step := &ExecutionStep{
|
|
Name: "analyzing",
|
|
Status: StatusAnalyzing,
|
|
Progress: 0,
|
|
}
|
|
now := time.Now()
|
|
step.StartTime = &now
|
|
|
|
// Simulate analysis
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
result.FragmentsBefore = 1500
|
|
step.Progress = 100
|
|
step.EndTime = &now
|
|
result.Steps = append(result.Steps, step)
|
|
|
|
return nil
|
|
}
|
|
|
|
// defragmentVolume defragments the volume
|
|
func (e *Executor) defragmentVolume(result *VacuumExecutionResult) error {
|
|
step := &ExecutionStep{
|
|
Name: "defragmenting",
|
|
Status: StatusDefragmenting,
|
|
Progress: 0,
|
|
}
|
|
now := time.Now()
|
|
step.StartTime = &now
|
|
|
|
// Simulate defragmentation with progress
|
|
for i := 0; i < 10; i++ {
|
|
time.Sleep(20 * time.Millisecond)
|
|
step.Progress = float32((i + 1) * 10)
|
|
}
|
|
|
|
result.FilesMoved = 850
|
|
result.SpaceFreed = 1000000
|
|
|
|
step.Progress = 100
|
|
step.EndTime = &now
|
|
result.Steps = append(result.Steps, step)
|
|
|
|
return nil
|
|
}
|
|
|
|
// optimizeStorage optimizes storage layout
|
|
func (e *Executor) optimizeStorage(result *VacuumExecutionResult) error {
|
|
step := &ExecutionStep{
|
|
Name: "optimizing",
|
|
Status: StatusOptimizing,
|
|
Progress: 0,
|
|
}
|
|
now := time.Now()
|
|
step.StartTime = &now
|
|
|
|
// Simulate optimization
|
|
for i := 0; i < 5; i++ {
|
|
time.Sleep(30 * time.Millisecond)
|
|
step.Progress = float32((i + 1) * 20)
|
|
}
|
|
|
|
step.Progress = 100
|
|
step.EndTime = &now
|
|
result.Steps = append(result.Steps, step)
|
|
|
|
return nil
|
|
}
|
|
|
|
// verifyResult verifies the operation result
|
|
func (e *Executor) verifyResult(result *VacuumExecutionResult) error {
|
|
step := &ExecutionStep{
|
|
Name: "verifying",
|
|
Status: StatusVerifying,
|
|
Progress: 0,
|
|
}
|
|
now := time.Now()
|
|
step.StartTime = &now
|
|
|
|
// Simulate verification
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
result.FragmentsAfter = 320
|
|
result.Metadata["space_freed_mb"] = "1"
|
|
result.Metadata["files_moved"] = "850"
|
|
result.Metadata["fragmentation_reduction"] = "78.7%"
|
|
|
|
step.Progress = 100
|
|
step.EndTime = &now
|
|
result.Steps = append(result.Steps, step)
|
|
|
|
return nil
|
|
}
|
|
|
|
// extractVolumeID extracts the volume ID from job payload
|
|
func extractVolumeID(payload *plugin_pb.JobPayload) uint32 {
|
|
if payload == nil || len(payload.Data) < 4 {
|
|
return 0
|
|
}
|
|
return uint32(payload.Data[0]) |
|
|
(uint32(payload.Data[1]) << 8) |
|
|
(uint32(payload.Data[2]) << 16) |
|
|
(uint32(payload.Data[3]) << 24)
|
|
}
|
|
|
|
// ValidateExecutionResult validates the result of execution
|
|
func ValidateExecutionResult(result *VacuumExecutionResult) bool {
|
|
if !result.Success {
|
|
return false
|
|
}
|
|
|
|
if result.EndTime.Before(result.StartTime) {
|
|
return false
|
|
}
|
|
|
|
if len(result.Steps) != 4 {
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|