mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-20 16:51:31 +00:00
remove "enhanced" reference
This commit is contained in:
@@ -10,9 +10,9 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
||||
)
|
||||
|
||||
// TestEnhancedECIntegration tests the enhanced EC implementation with the admin server
|
||||
func TestEnhancedECIntegration(t *testing.T) {
|
||||
t.Logf("Starting enhanced EC integration test")
|
||||
// TestECIntegration tests the EC implementation with the admin server
|
||||
func TestECIntegration(t *testing.T) {
|
||||
t.Logf("Starting EC integration test")
|
||||
|
||||
// Step 1: Create admin server
|
||||
config := &MinimalAdminConfig{
|
||||
@@ -51,7 +51,7 @@ func TestEnhancedECIntegration(t *testing.T) {
|
||||
|
||||
// Step 3: Create an EC task
|
||||
ecTask := &types.Task{
|
||||
ID: "enhanced-ec-task-1",
|
||||
ID: "ec-task-1",
|
||||
Type: types.TaskTypeErasureCoding,
|
||||
VolumeID: 12345,
|
||||
Server: "localhost:8080",
|
||||
@@ -70,7 +70,7 @@ func TestEnhancedECIntegration(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to queue EC task: %v", err)
|
||||
}
|
||||
t.Logf("Successfully queued enhanced EC task %s for volume %d", ecTask.ID, ecTask.VolumeID)
|
||||
t.Logf("Successfully queued EC task %s for volume %d", ecTask.ID, ecTask.VolumeID)
|
||||
|
||||
// Step 4: Worker requests the task
|
||||
assignedTask, err := adminServer.RequestTask("ec-worker-1", []types.TaskType{types.TaskTypeErasureCoding})
|
||||
@@ -82,8 +82,8 @@ func TestEnhancedECIntegration(t *testing.T) {
|
||||
t.Logf("EC worker got task: %s (%s) for volume %d",
|
||||
assignedTask.ID, assignedTask.Type, assignedTask.VolumeID)
|
||||
|
||||
// Step 5: Simulate enhanced EC task execution progress
|
||||
t.Logf("Simulating enhanced EC task execution phases")
|
||||
// Step 5: Simulate EC task execution phases
|
||||
t.Logf("Simulating EC task execution phases")
|
||||
|
||||
// Phase 1: Copying volume data
|
||||
err = adminServer.UpdateTaskProgress(assignedTask.ID, 15.0)
|
||||
@@ -132,7 +132,7 @@ func TestEnhancedECIntegration(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("Failed to complete EC task: %v", err)
|
||||
}
|
||||
t.Logf("Successfully completed enhanced EC task %s", assignedTask.ID)
|
||||
t.Logf("Successfully completed EC task %s", assignedTask.ID)
|
||||
} else {
|
||||
t.Logf("No EC task was assigned (expected in test environment)")
|
||||
}
|
||||
@@ -151,16 +151,16 @@ func TestEnhancedECIntegration(t *testing.T) {
|
||||
lastEntry.TaskID, lastEntry.TaskType, lastEntry.Duration)
|
||||
|
||||
if lastEntry.TaskType == types.TaskTypeErasureCoding {
|
||||
t.Logf("Enhanced EC task completed successfully")
|
||||
t.Logf("EC task completed successfully")
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("Enhanced EC integration test completed successfully")
|
||||
t.Logf("EC integration test completed successfully")
|
||||
}
|
||||
|
||||
// TestEnhancedECTaskValidation tests the enhanced EC task validation
|
||||
func TestEnhancedECTaskValidation(t *testing.T) {
|
||||
t.Logf("Testing enhanced EC task validation")
|
||||
// TestECTaskValidation tests the EC task validation
|
||||
func TestECTaskValidation(t *testing.T) {
|
||||
t.Logf("Testing EC task validation")
|
||||
|
||||
// Create a temporary work directory
|
||||
workDir := filepath.Join(os.TempDir(), "seaweedfs_ec_test")
|
||||
@@ -170,8 +170,8 @@ func TestEnhancedECTaskValidation(t *testing.T) {
|
||||
}
|
||||
defer os.RemoveAll(workDir)
|
||||
|
||||
// Create enhanced EC task
|
||||
enhancedTask := ec_task.NewEnhancedECTask(
|
||||
// Create EC task
|
||||
ecTask := ec_task.NewTaskWithParams(
|
||||
"localhost:8080", // source server
|
||||
12345, // volume ID
|
||||
"localhost:9333", // master client
|
||||
@@ -188,7 +188,7 @@ func TestEnhancedECTaskValidation(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
err = enhancedTask.Validate(validParams)
|
||||
err = ecTask.Validate(validParams)
|
||||
if err != nil {
|
||||
t.Errorf("Valid parameters should pass validation: %v", err)
|
||||
}
|
||||
@@ -199,25 +199,25 @@ func TestEnhancedECTaskValidation(t *testing.T) {
|
||||
Server: "", // Empty server
|
||||
}
|
||||
|
||||
err = enhancedTask.Validate(invalidParams)
|
||||
err = ecTask.Validate(invalidParams)
|
||||
if err == nil {
|
||||
t.Errorf("Invalid parameters should fail validation")
|
||||
}
|
||||
|
||||
// Test time estimation
|
||||
estimatedTime := enhancedTask.EstimateTime(validParams)
|
||||
estimatedTime := ecTask.EstimateTime(validParams)
|
||||
t.Logf("Estimated time for 32GB volume EC: %v", estimatedTime)
|
||||
|
||||
if estimatedTime < 20*time.Minute {
|
||||
t.Errorf("Expected at least 20 minutes for large volume EC, got %v", estimatedTime)
|
||||
}
|
||||
|
||||
t.Logf("Enhanced EC task validation completed successfully")
|
||||
t.Logf("EC task validation completed successfully")
|
||||
}
|
||||
|
||||
// TestEnhancedECFeatures tests specific enhanced EC features
|
||||
func TestEnhancedECFeatures(t *testing.T) {
|
||||
t.Logf("Testing enhanced EC features")
|
||||
// TestECFeatures tests specific EC features
|
||||
func TestECFeatures(t *testing.T) {
|
||||
t.Logf("Testing EC features")
|
||||
|
||||
// Create temporary work directory
|
||||
workDir := filepath.Join(os.TempDir(), "seaweedfs_ec_features_test")
|
||||
@@ -227,7 +227,7 @@ func TestEnhancedECFeatures(t *testing.T) {
|
||||
}
|
||||
defer os.RemoveAll(workDir)
|
||||
|
||||
enhancedTask := ec_task.NewEnhancedECTask(
|
||||
ecTask := ec_task.NewTaskWithParams(
|
||||
"localhost:8080",
|
||||
54321,
|
||||
"localhost:9333",
|
||||
@@ -237,17 +237,17 @@ func TestEnhancedECFeatures(t *testing.T) {
|
||||
// Test step tracking
|
||||
t.Logf("Testing step tracking functionality")
|
||||
|
||||
currentStep := enhancedTask.GetCurrentStep()
|
||||
currentStep := ecTask.GetCurrentStep()
|
||||
t.Logf("Initial current step: %s", currentStep)
|
||||
|
||||
progress := enhancedTask.GetProgress()
|
||||
progress := ecTask.GetProgress()
|
||||
t.Logf("Initial progress: %.1f%%", progress)
|
||||
|
||||
// Test parameter extraction
|
||||
params := types.TaskParams{
|
||||
VolumeID: 54321,
|
||||
Server: "localhost:8080",
|
||||
Collection: "enhanced_test",
|
||||
Collection: "features_test",
|
||||
Parameters: map[string]interface{}{
|
||||
"volume_size": int64(64 * 1024 * 1024 * 1024), // 64GB
|
||||
"data_shards": 10,
|
||||
@@ -256,7 +256,7 @@ func TestEnhancedECFeatures(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
estimatedTime := enhancedTask.EstimateTime(params)
|
||||
estimatedTime := ecTask.EstimateTime(params)
|
||||
expectedMinTime := time.Duration(64*2) * time.Minute // 2 minutes per GB
|
||||
|
||||
t.Logf("64GB volume estimated time: %v (expected minimum: %v)", estimatedTime, expectedMinTime)
|
||||
@@ -265,15 +265,15 @@ func TestEnhancedECFeatures(t *testing.T) {
|
||||
t.Errorf("Time estimate seems too low for 64GB volume")
|
||||
}
|
||||
|
||||
t.Logf("Enhanced EC features test completed successfully")
|
||||
t.Logf("EC features test completed successfully")
|
||||
}
|
||||
|
||||
// TestECTaskComparison compares basic vs enhanced EC implementations
|
||||
// TestECTaskComparison tests EC implementation features
|
||||
func TestECTaskComparison(t *testing.T) {
|
||||
t.Logf("Comparing basic vs enhanced EC implementations")
|
||||
t.Logf("Testing EC implementation features")
|
||||
|
||||
// Basic EC task estimation
|
||||
basicParams := types.TaskParams{
|
||||
// EC task estimation
|
||||
params := types.TaskParams{
|
||||
VolumeID: 11111,
|
||||
Server: "localhost:8080",
|
||||
Parameters: map[string]interface{}{
|
||||
@@ -281,44 +281,29 @@ func TestECTaskComparison(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
// Create basic task (existing implementation)
|
||||
basicTask := ec_task.NewTask("localhost:8080", 11111)
|
||||
basicTime := basicTask.EstimateTime(basicParams)
|
||||
|
||||
// Create enhanced task
|
||||
// Create task
|
||||
workDir := filepath.Join(os.TempDir(), "seaweedfs_ec_comparison")
|
||||
defer os.RemoveAll(workDir)
|
||||
|
||||
enhancedTask := ec_task.NewEnhancedECTask(
|
||||
ecTask := ec_task.NewTaskWithParams(
|
||||
"localhost:8080",
|
||||
22222,
|
||||
"localhost:9333",
|
||||
workDir,
|
||||
)
|
||||
enhancedTime := enhancedTask.EstimateTime(basicParams)
|
||||
estimatedTime := ecTask.EstimateTime(params)
|
||||
|
||||
t.Logf("Basic EC task estimated time: %v", basicTime)
|
||||
t.Logf("Enhanced EC task estimated time: %v", enhancedTime)
|
||||
t.Logf("EC task estimated time: %v", estimatedTime)
|
||||
|
||||
// Enhanced should take longer due to additional processing
|
||||
if enhancedTime <= basicTime {
|
||||
t.Logf("Note: Enhanced EC might take longer due to local processing and smart distribution")
|
||||
}
|
||||
// Test feature capabilities
|
||||
t.Logf("EC implementation features:")
|
||||
t.Logf(" - Local volume data copying with progress tracking")
|
||||
t.Logf(" - Local Reed-Solomon encoding (10+4 shards)")
|
||||
t.Logf(" - Intelligent shard placement with rack awareness")
|
||||
t.Logf(" - Load balancing across available servers")
|
||||
t.Logf(" - Backup server selection for redundancy")
|
||||
t.Logf(" - Detailed step-by-step progress tracking")
|
||||
t.Logf(" - Comprehensive error handling and recovery")
|
||||
|
||||
// Test feature differences
|
||||
t.Logf("Basic EC features:")
|
||||
t.Logf(" - Direct volume server EC generation")
|
||||
t.Logf(" - Simple shard mounting")
|
||||
t.Logf(" - No custom placement logic")
|
||||
|
||||
t.Logf("Enhanced EC features:")
|
||||
t.Logf(" - Local volume data copying")
|
||||
t.Logf(" - Local Reed-Solomon encoding")
|
||||
t.Logf(" - Intelligent shard placement with affinity")
|
||||
t.Logf(" - Rack diversity for data shards")
|
||||
t.Logf(" - Load balancing across servers")
|
||||
t.Logf(" - Backup server selection")
|
||||
t.Logf(" - Detailed progress tracking")
|
||||
|
||||
t.Logf("EC task comparison completed successfully")
|
||||
t.Logf("EC implementation test completed successfully")
|
||||
}
|
||||
488
weed/admin/task/ec_worker_test.go
Normal file
488
weed/admin/task/ec_worker_test.go
Normal file
@@ -0,0 +1,488 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
||||
)
|
||||
|
||||
// TestECWorkerIntegration tests the complete EC worker functionality
|
||||
func TestECWorkerIntegration(t *testing.T) {
|
||||
t.Logf("Starting EC worker integration test")
|
||||
|
||||
// Step 1: Create admin server with EC configuration
|
||||
config := &MinimalAdminConfig{
|
||||
ScanInterval: 5 * time.Second,
|
||||
WorkerTimeout: 60 * time.Second,
|
||||
TaskTimeout: 45 * time.Minute, // EC takes longer
|
||||
MaxRetries: 3,
|
||||
ReconcileInterval: 5 * time.Minute,
|
||||
EnableFailureRecovery: true,
|
||||
MaxConcurrentTasks: 1, // One at a time for EC
|
||||
}
|
||||
|
||||
adminServer := NewMinimalAdminServer(config, nil)
|
||||
err := adminServer.Start()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to start admin server: %v", err)
|
||||
}
|
||||
defer adminServer.Stop()
|
||||
t.Logf("✓ Admin server started successfully")
|
||||
|
||||
// Step 2: Register EC-capable worker
|
||||
worker := &types.Worker{
|
||||
ID: "ec-worker-1",
|
||||
Address: "localhost:9001",
|
||||
Capabilities: []types.TaskType{types.TaskTypeErasureCoding},
|
||||
MaxConcurrent: 1,
|
||||
Status: "active",
|
||||
CurrentLoad: 0,
|
||||
LastHeartbeat: time.Now(),
|
||||
}
|
||||
|
||||
err = adminServer.RegisterWorker(worker)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to register EC worker: %v", err)
|
||||
}
|
||||
t.Logf("✓ EC worker registered: %s", worker.ID)
|
||||
|
||||
// Step 3: Create work directory for EC processing
|
||||
workDir := filepath.Join(os.TempDir(), "seaweedfs_ec_test")
|
||||
err = os.MkdirAll(workDir, 0755)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create work directory: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(workDir)
|
||||
t.Logf("✓ Work directory created: %s", workDir)
|
||||
|
||||
// Step 4: Create EC task with comprehensive parameters
|
||||
ecTask := &types.Task{
|
||||
ID: "ec-test-task-1",
|
||||
Type: types.TaskTypeErasureCoding,
|
||||
VolumeID: 54321,
|
||||
Server: "localhost:8080",
|
||||
Status: types.TaskStatusPending,
|
||||
Priority: types.TaskPriorityHigh,
|
||||
Parameters: map[string]interface{}{
|
||||
"volume_size": int64(64 * 1024 * 1024 * 1024), // 64GB volume
|
||||
"master_client": "localhost:9333",
|
||||
"work_dir": workDir,
|
||||
"collection": "test",
|
||||
"data_shards": 10,
|
||||
"parity_shards": 4,
|
||||
"rack_aware": true,
|
||||
"load_balance": true,
|
||||
},
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
|
||||
err = adminServer.QueueTask(ecTask)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to queue EC task: %v", err)
|
||||
}
|
||||
t.Logf("✓ EC task queued: %s for volume %d", ecTask.ID, ecTask.VolumeID)
|
||||
|
||||
// Step 5: Worker requests and receives the EC task
|
||||
assignedTask, err := adminServer.RequestTask("ec-worker-1", []types.TaskType{types.TaskTypeErasureCoding})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to request EC task: %v", err)
|
||||
}
|
||||
|
||||
if assignedTask == nil {
|
||||
t.Fatalf("No EC task was assigned")
|
||||
}
|
||||
|
||||
t.Logf("✓ EC task assigned: %s (%s) for volume %d",
|
||||
assignedTask.ID, assignedTask.Type, assignedTask.VolumeID)
|
||||
|
||||
// Step 6: Test EC task creation and validation
|
||||
t.Logf("Testing EC task creation and validation")
|
||||
|
||||
// Create EC task instance directly
|
||||
factory := erasure_coding.NewFactory()
|
||||
taskParams := types.TaskParams{
|
||||
VolumeID: assignedTask.VolumeID,
|
||||
Server: assignedTask.Server,
|
||||
Collection: "test",
|
||||
Parameters: assignedTask.Parameters,
|
||||
}
|
||||
|
||||
taskInstance, err := factory.Create(taskParams)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create EC task instance: %v", err)
|
||||
}
|
||||
t.Logf("✓ EC task instance created successfully")
|
||||
|
||||
// Step 7: Validate task parameters
|
||||
err = taskInstance.Validate(taskParams)
|
||||
if err != nil {
|
||||
t.Errorf("EC task validation failed: %v", err)
|
||||
} else {
|
||||
t.Logf("✓ EC task validation passed")
|
||||
}
|
||||
|
||||
// Step 8: Test time estimation
|
||||
estimatedTime := taskInstance.EstimateTime(taskParams)
|
||||
expectedMinTime := time.Duration(64*2) * time.Minute // 2 minutes per GB for 64GB
|
||||
|
||||
t.Logf("✓ EC estimated time: %v (minimum expected: %v)", estimatedTime, expectedMinTime)
|
||||
|
||||
if estimatedTime < expectedMinTime {
|
||||
t.Logf("⚠ Note: Estimated time seems optimistic for 64GB volume")
|
||||
}
|
||||
|
||||
// Step 9: Simulate EC task execution phases
|
||||
t.Logf("Simulating EC execution phases:")
|
||||
|
||||
phases := []struct {
|
||||
progress float64
|
||||
phase string
|
||||
}{
|
||||
{5.0, "Initializing EC processing"},
|
||||
{15.0, "Volume data copied to local disk with progress tracking"},
|
||||
{25.0, "Source volume marked as read-only"},
|
||||
{45.0, "Local Reed-Solomon encoding (10+4 shards) completed"},
|
||||
{60.0, "Created 14 EC shards with verification"},
|
||||
{70.0, "Optimal shard placement calculated with rack awareness"},
|
||||
{85.0, "Intelligent shard distribution with load balancing"},
|
||||
{95.0, "Shard placement verified across multiple racks"},
|
||||
{100.0, "EC processing completed with cleanup"},
|
||||
}
|
||||
|
||||
for _, phase := range phases {
|
||||
err = adminServer.UpdateTaskProgress(assignedTask.ID, phase.progress)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to update task progress to %.1f%%: %v", phase.progress, err)
|
||||
} else {
|
||||
t.Logf(" %.1f%% - %s", phase.progress, phase.phase)
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond) // Simulate processing time
|
||||
}
|
||||
|
||||
// Step 10: Complete the EC task
|
||||
err = adminServer.CompleteTask(assignedTask.ID, true, "")
|
||||
if err != nil {
|
||||
t.Errorf("Failed to complete EC task: %v", err)
|
||||
} else {
|
||||
t.Logf("✓ EC task completed successfully")
|
||||
}
|
||||
|
||||
// Step 11: Verify EC task completion and metrics
|
||||
stats := adminServer.GetSystemStats()
|
||||
t.Logf("✓ Final stats: Active tasks=%d, Queued tasks=%d, Active workers=%d, Total tasks=%d",
|
||||
stats.ActiveTasks, stats.QueuedTasks, stats.ActiveWorkers, stats.TotalTasks)
|
||||
|
||||
history := adminServer.GetTaskHistory()
|
||||
t.Logf("✓ Task history contains %d completed tasks", len(history))
|
||||
|
||||
if len(history) > 0 {
|
||||
lastEntry := history[len(history)-1]
|
||||
t.Logf("✓ Last completed task: %s (%s) - Duration: %v",
|
||||
lastEntry.TaskID, lastEntry.TaskType, lastEntry.Duration)
|
||||
|
||||
if lastEntry.TaskType == types.TaskTypeErasureCoding {
|
||||
t.Logf("✅ EC task execution verified!")
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("✅ EC worker integration test completed successfully")
|
||||
}
|
||||
|
||||
// TestECFeatureValidation tests specific EC features
|
||||
func TestECFeatureValidation(t *testing.T) {
|
||||
t.Logf("Testing EC feature validation")
|
||||
|
||||
// Create work directory
|
||||
workDir := filepath.Join(os.TempDir(), "seaweedfs_ec_features_test")
|
||||
err := os.MkdirAll(workDir, 0755)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create work directory: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(workDir)
|
||||
|
||||
// Test EC task features
|
||||
ecTask := erasure_coding.NewTaskWithParams(
|
||||
"localhost:8080", // source server
|
||||
98765, // volume ID
|
||||
"localhost:9333", // master client
|
||||
workDir, // work directory
|
||||
)
|
||||
|
||||
// Test current step tracking
|
||||
currentStep := ecTask.GetCurrentStep()
|
||||
t.Logf("✓ Initial current step: '%s'", currentStep)
|
||||
|
||||
initialProgress := ecTask.GetProgress()
|
||||
t.Logf("✓ Initial progress: %.1f%%", initialProgress)
|
||||
|
||||
// Test parameter validation with features
|
||||
validParams := types.TaskParams{
|
||||
VolumeID: 98765,
|
||||
Server: "localhost:8080",
|
||||
Collection: "features_test",
|
||||
Parameters: map[string]interface{}{
|
||||
"volume_size": int64(128 * 1024 * 1024 * 1024), // 128GB
|
||||
"master_client": "localhost:9333",
|
||||
"work_dir": workDir,
|
||||
"data_shards": 10,
|
||||
"parity_shards": 4,
|
||||
"rack_awareness": true,
|
||||
"load_balancing": true,
|
||||
"backup_servers": 2,
|
||||
"affinity_zones": []string{"zone-a", "zone-b", "zone-c"},
|
||||
},
|
||||
}
|
||||
|
||||
err = ecTask.Validate(validParams)
|
||||
if err != nil {
|
||||
t.Errorf("Valid parameters should pass validation: %v", err)
|
||||
} else {
|
||||
t.Logf("✓ Parameter validation passed")
|
||||
}
|
||||
|
||||
// Test time estimation for large volume
|
||||
estimatedTime := ecTask.EstimateTime(validParams)
|
||||
expectedMinTime := time.Duration(128*2) * time.Minute // 2 minutes per GB
|
||||
|
||||
t.Logf("✓ 128GB volume estimated time: %v (expected minimum: %v)", estimatedTime, expectedMinTime)
|
||||
|
||||
if estimatedTime < expectedMinTime {
|
||||
t.Errorf("Time estimate seems too low for 128GB volume")
|
||||
}
|
||||
|
||||
// Test invalid parameters
|
||||
invalidParams := types.TaskParams{
|
||||
VolumeID: 0, // Invalid
|
||||
Server: "", // Invalid
|
||||
}
|
||||
|
||||
err = ecTask.Validate(invalidParams)
|
||||
if err == nil {
|
||||
t.Errorf("Invalid parameters should fail validation")
|
||||
} else {
|
||||
t.Logf("✓ Invalid parameter validation correctly failed: %v", err)
|
||||
}
|
||||
|
||||
t.Logf("✅ EC feature validation completed successfully")
|
||||
}
|
||||
|
||||
// TestECWorkflow tests the complete EC workflow
|
||||
func TestECWorkflow(t *testing.T) {
|
||||
t.Logf("Testing complete EC workflow")
|
||||
|
||||
// Create admin server
|
||||
config := &MinimalAdminConfig{
|
||||
ScanInterval: 10 * time.Second,
|
||||
WorkerTimeout: 30 * time.Second,
|
||||
TaskTimeout: 60 * time.Minute,
|
||||
MaxRetries: 3,
|
||||
ReconcileInterval: 5 * time.Minute,
|
||||
EnableFailureRecovery: true,
|
||||
MaxConcurrentTasks: 1,
|
||||
}
|
||||
|
||||
adminServer := NewMinimalAdminServer(config, nil)
|
||||
err := adminServer.Start()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to start admin server: %v", err)
|
||||
}
|
||||
defer adminServer.Stop()
|
||||
|
||||
// Register multiple workers with different capabilities
|
||||
workers := []*types.Worker{
|
||||
{
|
||||
ID: "ec-specialist-1",
|
||||
Address: "localhost:9001",
|
||||
Capabilities: []types.TaskType{types.TaskTypeErasureCoding},
|
||||
MaxConcurrent: 1,
|
||||
Status: "active",
|
||||
CurrentLoad: 0,
|
||||
LastHeartbeat: time.Now(),
|
||||
},
|
||||
{
|
||||
ID: "vacuum-worker-1",
|
||||
Address: "localhost:9002",
|
||||
Capabilities: []types.TaskType{types.TaskTypeVacuum},
|
||||
MaxConcurrent: 2,
|
||||
Status: "active",
|
||||
CurrentLoad: 0,
|
||||
LastHeartbeat: time.Now(),
|
||||
},
|
||||
{
|
||||
ID: "multi-capability-worker-1",
|
||||
Address: "localhost:9003",
|
||||
Capabilities: []types.TaskType{types.TaskTypeVacuum, types.TaskTypeErasureCoding},
|
||||
MaxConcurrent: 2,
|
||||
Status: "active",
|
||||
CurrentLoad: 0,
|
||||
LastHeartbeat: time.Now(),
|
||||
},
|
||||
}
|
||||
|
||||
for _, worker := range workers {
|
||||
err = adminServer.RegisterWorker(worker)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to register worker %s: %v", worker.ID, err)
|
||||
}
|
||||
t.Logf("✓ Registered worker %s with capabilities %v", worker.ID, worker.Capabilities)
|
||||
}
|
||||
|
||||
// Create test work directory
|
||||
workDir := filepath.Join(os.TempDir(), "seaweedfs_workflow_test")
|
||||
err = os.MkdirAll(workDir, 0755)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create work directory: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(workDir)
|
||||
|
||||
// Create multiple tasks of different types
|
||||
tasks := []*types.Task{
|
||||
{
|
||||
ID: "ec-workflow-1",
|
||||
Type: types.TaskTypeErasureCoding,
|
||||
VolumeID: 11111,
|
||||
Server: "localhost:8080",
|
||||
Status: types.TaskStatusPending,
|
||||
Priority: types.TaskPriorityHigh,
|
||||
Parameters: map[string]interface{}{
|
||||
"volume_size": int64(50 * 1024 * 1024 * 1024),
|
||||
"master_client": "localhost:9333",
|
||||
"work_dir": workDir,
|
||||
"collection": "workflow_test",
|
||||
},
|
||||
CreatedAt: time.Now(),
|
||||
},
|
||||
{
|
||||
ID: "vacuum-workflow-1",
|
||||
Type: types.TaskTypeVacuum,
|
||||
VolumeID: 22222,
|
||||
Server: "localhost:8081",
|
||||
Status: types.TaskStatusPending,
|
||||
Priority: types.TaskPriorityNormal,
|
||||
Parameters: map[string]interface{}{
|
||||
"garbage_threshold": "0.4",
|
||||
"volume_size": int64(20 * 1024 * 1024 * 1024),
|
||||
},
|
||||
CreatedAt: time.Now(),
|
||||
},
|
||||
{
|
||||
ID: "ec-workflow-2",
|
||||
Type: types.TaskTypeErasureCoding,
|
||||
VolumeID: 33333,
|
||||
Server: "localhost:8082",
|
||||
Status: types.TaskStatusPending,
|
||||
Priority: types.TaskPriorityNormal,
|
||||
Parameters: map[string]interface{}{
|
||||
"volume_size": int64(80 * 1024 * 1024 * 1024),
|
||||
"master_client": "localhost:9333",
|
||||
"work_dir": workDir,
|
||||
"collection": "workflow_test",
|
||||
},
|
||||
CreatedAt: time.Now(),
|
||||
},
|
||||
}
|
||||
|
||||
// Queue all tasks
|
||||
for _, task := range tasks {
|
||||
err = adminServer.QueueTask(task)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to queue task %s: %v", task.ID, err)
|
||||
}
|
||||
t.Logf("✓ Queued task %s (%s) for volume %d", task.ID, task.Type, task.VolumeID)
|
||||
}
|
||||
|
||||
// Test task assignment to appropriate workers
|
||||
t.Logf("Testing task assignments to appropriate workers")
|
||||
|
||||
// EC specialist should get EC tasks
|
||||
assignedTask, err := adminServer.RequestTask("ec-specialist-1", []types.TaskType{types.TaskTypeErasureCoding})
|
||||
if err != nil {
|
||||
t.Errorf("Failed to request task for EC specialist: %v", err)
|
||||
} else if assignedTask != nil {
|
||||
t.Logf("✓ EC specialist got task: %s (%s)", assignedTask.ID, assignedTask.Type)
|
||||
|
||||
// Complete the task
|
||||
err = adminServer.UpdateTaskProgress(assignedTask.ID, 100.0)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to update progress: %v", err)
|
||||
}
|
||||
|
||||
err = adminServer.CompleteTask(assignedTask.ID, true, "")
|
||||
if err != nil {
|
||||
t.Errorf("Failed to complete task: %v", err)
|
||||
}
|
||||
t.Logf("✓ EC task completed by specialist")
|
||||
}
|
||||
|
||||
// Vacuum worker should get vacuum tasks
|
||||
assignedTask, err = adminServer.RequestTask("vacuum-worker-1", []types.TaskType{types.TaskTypeVacuum})
|
||||
if err != nil {
|
||||
t.Errorf("Failed to request task for vacuum worker: %v", err)
|
||||
} else if assignedTask != nil {
|
||||
t.Logf("✓ Vacuum worker got task: %s (%s)", assignedTask.ID, assignedTask.Type)
|
||||
|
||||
// Complete the task
|
||||
err = adminServer.UpdateTaskProgress(assignedTask.ID, 100.0)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to update progress: %v", err)
|
||||
}
|
||||
|
||||
err = adminServer.CompleteTask(assignedTask.ID, true, "")
|
||||
if err != nil {
|
||||
t.Errorf("Failed to complete task: %v", err)
|
||||
}
|
||||
t.Logf("✓ Vacuum task completed by vacuum worker")
|
||||
}
|
||||
|
||||
// Multi-capability worker should get remaining tasks
|
||||
assignedTask, err = adminServer.RequestTask("multi-capability-worker-1", []types.TaskType{types.TaskTypeVacuum, types.TaskTypeErasureCoding})
|
||||
if err != nil {
|
||||
t.Errorf("Failed to request task for multi-capability worker: %v", err)
|
||||
} else if assignedTask != nil {
|
||||
t.Logf("✓ Multi-capability worker got task: %s (%s)", assignedTask.ID, assignedTask.Type)
|
||||
|
||||
// Complete the task
|
||||
err = adminServer.UpdateTaskProgress(assignedTask.ID, 100.0)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to update progress: %v", err)
|
||||
}
|
||||
|
||||
err = adminServer.CompleteTask(assignedTask.ID, true, "")
|
||||
if err != nil {
|
||||
t.Errorf("Failed to complete task: %v", err)
|
||||
}
|
||||
t.Logf("✓ Task completed by multi-capability worker")
|
||||
}
|
||||
|
||||
// Check final workflow statistics
|
||||
stats := adminServer.GetSystemStats()
|
||||
t.Logf("✓ Final workflow stats: Active tasks=%d, Queued tasks=%d, Active workers=%d, Total tasks=%d",
|
||||
stats.ActiveTasks, stats.QueuedTasks, stats.ActiveWorkers, stats.TotalTasks)
|
||||
|
||||
history := adminServer.GetTaskHistory()
|
||||
t.Logf("✓ Workflow history contains %d completed tasks", len(history))
|
||||
|
||||
// Analyze task completion by type
|
||||
ecTasks := 0
|
||||
vacuumTasks := 0
|
||||
|
||||
for _, entry := range history {
|
||||
switch entry.TaskType {
|
||||
case types.TaskTypeErasureCoding:
|
||||
ecTasks++
|
||||
t.Logf(" EC: %s - Worker: %s, Duration: %v",
|
||||
entry.TaskID, entry.WorkerID, entry.Duration)
|
||||
case types.TaskTypeVacuum:
|
||||
vacuumTasks++
|
||||
t.Logf(" Vacuum: %s - Worker: %s, Duration: %v",
|
||||
entry.TaskID, entry.WorkerID, entry.Duration)
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("✓ Completed tasks: %d EC, %d Vacuum", ecTasks, vacuumTasks)
|
||||
t.Logf("✅ EC workflow test completed successfully")
|
||||
}
|
||||
@@ -3,39 +3,95 @@ package erasure_coding
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// Task implements erasure coding operation to convert volumes to EC format
|
||||
// Task implements comprehensive erasure coding with local processing and smart distribution
|
||||
type Task struct {
|
||||
*tasks.BaseTask
|
||||
server string
|
||||
volumeID uint32
|
||||
collection string
|
||||
grpcDialOpt grpc.DialOption
|
||||
sourceServer string
|
||||
volumeID uint32
|
||||
collection string
|
||||
workDir string
|
||||
masterClient string
|
||||
grpcDialOpt grpc.DialOption
|
||||
|
||||
// EC parameters
|
||||
dataShards int // Default: 10
|
||||
parityShards int // Default: 4
|
||||
totalShards int // Default: 14
|
||||
|
||||
// Progress tracking
|
||||
currentStep string
|
||||
stepProgress map[string]float64
|
||||
}
|
||||
|
||||
// NewTask creates a new erasure coding task instance
|
||||
func NewTask(server string, volumeID uint32) *Task {
|
||||
// ServerInfo holds information about available servers for shard placement
|
||||
type ServerInfo struct {
|
||||
Address string
|
||||
DataCenter string
|
||||
Rack string
|
||||
AvailableSpace int64
|
||||
LoadScore float64
|
||||
ShardCount int
|
||||
}
|
||||
|
||||
// ShardPlacement represents where a shard should be placed
|
||||
type ShardPlacement struct {
|
||||
ShardID int
|
||||
ServerAddr string
|
||||
DataCenter string
|
||||
Rack string
|
||||
BackupAddrs []string // Alternative servers for redundancy
|
||||
}
|
||||
|
||||
// NewTask creates a new erasure coding task
|
||||
func NewTask(sourceServer string, volumeID uint32) *Task {
|
||||
task := &Task{
|
||||
BaseTask: tasks.NewBaseTask(types.TaskTypeErasureCoding),
|
||||
server: server,
|
||||
volumeID: volumeID,
|
||||
BaseTask: tasks.NewBaseTask(types.TaskTypeErasureCoding),
|
||||
sourceServer: sourceServer,
|
||||
volumeID: volumeID,
|
||||
masterClient: "localhost:9333", // Default master client
|
||||
workDir: "/tmp/seaweedfs_ec_work", // Default work directory
|
||||
dataShards: 10,
|
||||
parityShards: 4,
|
||||
totalShards: 14,
|
||||
stepProgress: make(map[string]float64),
|
||||
}
|
||||
return task
|
||||
}
|
||||
|
||||
// Execute executes the actual erasure coding task using real SeaweedFS operations
|
||||
func (t *Task) Execute(params types.TaskParams) error {
|
||||
glog.Infof("Starting erasure coding for volume %d on server %s", t.volumeID, t.server)
|
||||
// NewTaskWithParams creates a new erasure coding task with custom parameters
|
||||
func NewTaskWithParams(sourceServer string, volumeID uint32, masterClient string, workDir string) *Task {
|
||||
task := &Task{
|
||||
BaseTask: tasks.NewBaseTask(types.TaskTypeErasureCoding),
|
||||
sourceServer: sourceServer,
|
||||
volumeID: volumeID,
|
||||
masterClient: masterClient,
|
||||
workDir: workDir,
|
||||
dataShards: 10,
|
||||
parityShards: 4,
|
||||
totalShards: 14,
|
||||
stepProgress: make(map[string]float64),
|
||||
}
|
||||
return task
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
// Execute performs the comprehensive EC operation
|
||||
func (t *Task) Execute(params types.TaskParams) error {
|
||||
glog.Infof("Starting erasure coding for volume %d from server %s", t.volumeID, t.sourceServer)
|
||||
|
||||
// Extract parameters
|
||||
t.collection = params.Collection
|
||||
@@ -43,81 +99,575 @@ func (t *Task) Execute(params types.TaskParams) error {
|
||||
t.collection = "default"
|
||||
}
|
||||
|
||||
// Connect to volume server
|
||||
conn, err := grpc.Dial(t.server, grpc.WithInsecure())
|
||||
// Override defaults with parameters if provided
|
||||
if mc, ok := params.Parameters["master_client"].(string); ok && mc != "" {
|
||||
t.masterClient = mc
|
||||
}
|
||||
if wd, ok := params.Parameters["work_dir"].(string); ok && wd != "" {
|
||||
t.workDir = wd
|
||||
}
|
||||
|
||||
// Create working directory for this task
|
||||
taskWorkDir := filepath.Join(t.workDir, fmt.Sprintf("ec_%d_%d", t.volumeID, time.Now().Unix()))
|
||||
err := os.MkdirAll(taskWorkDir, 0755)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to volume server %s: %v", t.server, err)
|
||||
return fmt.Errorf("failed to create work directory %s: %v", taskWorkDir, err)
|
||||
}
|
||||
defer t.cleanup(taskWorkDir)
|
||||
|
||||
// Step 1: Copy volume data to local disk
|
||||
if err := t.copyVolumeDataLocally(taskWorkDir); err != nil {
|
||||
return fmt.Errorf("failed to copy volume data: %v", err)
|
||||
}
|
||||
|
||||
// Step 2: Mark source volume as read-only
|
||||
if err := t.markVolumeReadOnly(); err != nil {
|
||||
return fmt.Errorf("failed to mark volume read-only: %v", err)
|
||||
}
|
||||
|
||||
// Step 3: Perform local EC encoding
|
||||
shardFiles, err := t.performLocalECEncoding(taskWorkDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to perform EC encoding: %v", err)
|
||||
}
|
||||
|
||||
// Step 4: Find optimal shard placement
|
||||
placements, err := t.calculateOptimalShardPlacement()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to calculate shard placement: %v", err)
|
||||
}
|
||||
|
||||
// Step 5: Distribute shards to target servers
|
||||
if err := t.distributeShards(shardFiles, placements); err != nil {
|
||||
return fmt.Errorf("failed to distribute shards: %v", err)
|
||||
}
|
||||
|
||||
// Step 6: Verify and cleanup source volume
|
||||
if err := t.verifyAndCleanupSource(); err != nil {
|
||||
return fmt.Errorf("failed to verify and cleanup: %v", err)
|
||||
}
|
||||
|
||||
t.SetProgress(100.0)
|
||||
glog.Infof("Successfully completed erasure coding for volume %d", t.volumeID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// copyVolumeDataLocally copies the volume data from source server to local disk
|
||||
func (t *Task) copyVolumeDataLocally(workDir string) error {
|
||||
t.currentStep = "copying_volume_data"
|
||||
t.SetProgress(5.0)
|
||||
glog.V(1).Infof("Copying volume %d data from %s to local disk", t.volumeID, t.sourceServer)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Connect to source volume server
|
||||
conn, err := grpc.Dial(t.sourceServer, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to source server %s: %v", t.sourceServer, err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
client := volume_server_pb.NewVolumeServerClient(conn)
|
||||
|
||||
// Step 1: Mark volume as read-only first
|
||||
t.SetProgress(10.0)
|
||||
glog.V(1).Infof("Marking volume %d as read-only", t.volumeID)
|
||||
|
||||
_, err = client.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to mark volume %d as read-only: %v", t.volumeID, err)
|
||||
}
|
||||
|
||||
// Step 2: Generate EC shards
|
||||
t.SetProgress(30.0)
|
||||
glog.V(1).Infof("Generating EC shards for volume %d", t.volumeID)
|
||||
|
||||
_, err = client.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{
|
||||
VolumeId: t.volumeID,
|
||||
Collection: t.collection,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to generate EC shards for volume %d: %v", t.volumeID, err)
|
||||
}
|
||||
|
||||
// Step 3: Mount EC shards (all 14 shards: 10 data + 4 parity)
|
||||
t.SetProgress(70.0)
|
||||
glog.V(1).Infof("Mounting EC shards for volume %d", t.volumeID)
|
||||
|
||||
// Create shard IDs for all 14 shards (0-13)
|
||||
shardIds := make([]uint32, 14)
|
||||
for i := 0; i < 14; i++ {
|
||||
shardIds[i] = uint32(i)
|
||||
}
|
||||
|
||||
_, err = client.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
|
||||
VolumeId: t.volumeID,
|
||||
Collection: t.collection,
|
||||
ShardIds: shardIds,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to mount EC shards for volume %d: %v", t.volumeID, err)
|
||||
}
|
||||
|
||||
// Step 4: Verify volume status
|
||||
t.SetProgress(90.0)
|
||||
glog.V(1).Infof("Verifying volume %d after EC conversion", t.volumeID)
|
||||
|
||||
// Check if volume is now read-only (which indicates successful EC conversion)
|
||||
// Get volume info first
|
||||
statusResp, err := client.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
if err != nil {
|
||||
glog.Warningf("Could not verify EC status for volume %d: %v", t.volumeID, err)
|
||||
// This is not a failure - continue
|
||||
} else {
|
||||
if statusResp.IsReadOnly {
|
||||
glog.V(1).Infof("Volume %d is now read-only, EC conversion likely successful", t.volumeID)
|
||||
} else {
|
||||
glog.Warningf("Volume %d is not read-only after EC conversion", t.volumeID)
|
||||
return fmt.Errorf("failed to get volume status: %v", err)
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Volume %d size: %d bytes, file count: %d",
|
||||
t.volumeID, statusResp.VolumeSize, statusResp.FileCount)
|
||||
|
||||
// Copy .dat file
|
||||
datFile := filepath.Join(workDir, fmt.Sprintf("%d.dat", t.volumeID))
|
||||
if err := t.copyVolumeFile(client, ctx, t.volumeID, ".dat", datFile, statusResp.VolumeSize); err != nil {
|
||||
return fmt.Errorf("failed to copy .dat file: %v", err)
|
||||
}
|
||||
|
||||
// Copy .idx file
|
||||
idxFile := filepath.Join(workDir, fmt.Sprintf("%d.idx", t.volumeID))
|
||||
if err := t.copyVolumeFile(client, ctx, t.volumeID, ".idx", idxFile, 0); err != nil {
|
||||
return fmt.Errorf("failed to copy .idx file: %v", err)
|
||||
}
|
||||
|
||||
t.SetProgress(15.0)
|
||||
glog.V(1).Infof("Successfully copied volume %d files to %s", t.volumeID, workDir)
|
||||
return nil
|
||||
}
|
||||
|
||||
// copyVolumeFile copies a specific volume file from source server
|
||||
func (t *Task) copyVolumeFile(client volume_server_pb.VolumeServerClient, ctx context.Context,
|
||||
volumeID uint32, extension string, localPath string, expectedSize uint64) error {
|
||||
|
||||
// Stream volume file data using CopyFile API
|
||||
stream, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
|
||||
VolumeId: volumeID,
|
||||
Ext: extension,
|
||||
Collection: t.collection,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start file copy stream: %v", err)
|
||||
}
|
||||
|
||||
// Create local file
|
||||
file, err := os.Create(localPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create local file %s: %v", localPath, err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Copy data with progress tracking
|
||||
var totalBytes int64
|
||||
for {
|
||||
resp, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to receive file data: %v", err)
|
||||
}
|
||||
|
||||
written, err := file.Write(resp.FileContent)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write to local file: %v", err)
|
||||
}
|
||||
|
||||
totalBytes += int64(written)
|
||||
|
||||
// Update progress for large files
|
||||
if expectedSize > 0 {
|
||||
progress := float64(totalBytes) / float64(expectedSize) * 10.0 // 10% of total progress
|
||||
t.SetProgress(5.0 + progress)
|
||||
}
|
||||
}
|
||||
|
||||
t.SetProgress(100.0)
|
||||
glog.Infof("Successfully completed erasure coding for volume %d on server %s", t.volumeID, t.server)
|
||||
glog.V(2).Infof("Copied %d bytes to %s", totalBytes, localPath)
|
||||
return nil
|
||||
}
|
||||
|
||||
// markVolumeReadOnly marks the source volume as read-only
|
||||
func (t *Task) markVolumeReadOnly() error {
|
||||
t.currentStep = "marking_readonly"
|
||||
t.SetProgress(20.0)
|
||||
glog.V(1).Infof("Marking volume %d as read-only", t.volumeID)
|
||||
|
||||
ctx := context.Background()
|
||||
conn, err := grpc.Dial(t.sourceServer, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to source server: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
client := volume_server_pb.NewVolumeServerClient(conn)
|
||||
_, err = client.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to mark volume read-only: %v", err)
|
||||
}
|
||||
|
||||
t.SetProgress(25.0)
|
||||
return nil
|
||||
}
|
||||
|
||||
// performLocalECEncoding performs Reed-Solomon encoding on local volume files
|
||||
func (t *Task) performLocalECEncoding(workDir string) ([]string, error) {
|
||||
t.currentStep = "encoding"
|
||||
t.SetProgress(30.0)
|
||||
glog.V(1).Infof("Performing local EC encoding for volume %d", t.volumeID)
|
||||
|
||||
datFile := filepath.Join(workDir, fmt.Sprintf("%d.dat", t.volumeID))
|
||||
idxFile := filepath.Join(workDir, fmt.Sprintf("%d.idx", t.volumeID))
|
||||
|
||||
// Check if files exist and get their sizes
|
||||
datInfo, err := os.Stat(datFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to stat dat file: %v", err)
|
||||
}
|
||||
|
||||
idxInfo, err := os.Stat(idxFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to stat idx file: %v", err)
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Encoding files: %s (%d bytes), %s (%d bytes)",
|
||||
datFile, datInfo.Size(), idxFile, idxInfo.Size())
|
||||
|
||||
// Generate EC shards using SeaweedFS erasure coding
|
||||
shardFiles := make([]string, t.totalShards)
|
||||
for i := 0; i < t.totalShards; i++ {
|
||||
shardFiles[i] = filepath.Join(workDir, fmt.Sprintf("%d.ec%02d", t.volumeID, i))
|
||||
}
|
||||
|
||||
// Encode .dat file
|
||||
if err := t.encodeFile(datFile, shardFiles, ".dat"); err != nil {
|
||||
return nil, fmt.Errorf("failed to encode dat file: %v", err)
|
||||
}
|
||||
|
||||
t.SetProgress(45.0)
|
||||
|
||||
// Encode .idx file
|
||||
if err := t.encodeFile(idxFile, shardFiles, ".idx"); err != nil {
|
||||
return nil, fmt.Errorf("failed to encode idx file: %v", err)
|
||||
}
|
||||
|
||||
t.SetProgress(60.0)
|
||||
glog.V(1).Infof("Successfully created %d EC shards for volume %d", t.totalShards, t.volumeID)
|
||||
return shardFiles, nil
|
||||
}
|
||||
|
||||
// encodeFile encodes a single file into EC shards
|
||||
func (t *Task) encodeFile(inputFile string, shardFiles []string, fileType string) error {
|
||||
// Read input file
|
||||
data, err := os.ReadFile(inputFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read input file: %v", err)
|
||||
}
|
||||
|
||||
// Write data to a temporary file first, then use SeaweedFS erasure coding
|
||||
tempFile := filepath.Join(filepath.Dir(shardFiles[0]), fmt.Sprintf("temp_%s", filepath.Base(inputFile)))
|
||||
err = os.WriteFile(tempFile, data, 0644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write temp file: %v", err)
|
||||
}
|
||||
defer os.Remove(tempFile)
|
||||
|
||||
// Use SeaweedFS erasure coding library with base filename
|
||||
baseFileName := tempFile[:len(tempFile)-len(filepath.Ext(tempFile))]
|
||||
err = erasure_coding.WriteEcFiles(baseFileName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write EC files: %v", err)
|
||||
}
|
||||
|
||||
// Verify that shards were created
|
||||
for i, shardFile := range shardFiles {
|
||||
if _, err := os.Stat(shardFile); err != nil {
|
||||
glog.Warningf("Shard %d file %s not found: %v", i, shardFile, err)
|
||||
} else {
|
||||
info, _ := os.Stat(shardFile)
|
||||
glog.V(2).Infof("Created shard %d: %s (%d bytes)", i, shardFile, info.Size())
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// calculateOptimalShardPlacement determines where to place each shard for optimal distribution
|
||||
func (t *Task) calculateOptimalShardPlacement() ([]ShardPlacement, error) {
|
||||
t.currentStep = "calculating_placement"
|
||||
t.SetProgress(65.0)
|
||||
glog.V(1).Infof("Calculating optimal shard placement for volume %d", t.volumeID)
|
||||
|
||||
// Get available servers from master
|
||||
servers, err := t.getAvailableServers()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get available servers: %v", err)
|
||||
}
|
||||
|
||||
if len(servers) < t.totalShards {
|
||||
return nil, fmt.Errorf("insufficient servers: need %d, have %d", t.totalShards, len(servers))
|
||||
}
|
||||
|
||||
// Sort servers by placement desirability (considering space, load, affinity)
|
||||
t.rankServersForPlacement(servers)
|
||||
|
||||
// Assign shards to servers with affinity logic
|
||||
placements := make([]ShardPlacement, t.totalShards)
|
||||
usedServers := make(map[string]int) // Track how many shards per server
|
||||
|
||||
for shardID := 0; shardID < t.totalShards; shardID++ {
|
||||
server := t.selectBestServerForShard(servers, usedServers, shardID)
|
||||
if server == nil {
|
||||
return nil, fmt.Errorf("failed to find suitable server for shard %d", shardID)
|
||||
}
|
||||
|
||||
placements[shardID] = ShardPlacement{
|
||||
ShardID: shardID,
|
||||
ServerAddr: server.Address,
|
||||
DataCenter: server.DataCenter,
|
||||
Rack: server.Rack,
|
||||
BackupAddrs: t.selectBackupServers(servers, server, 2),
|
||||
}
|
||||
|
||||
usedServers[server.Address]++
|
||||
glog.V(2).Infof("Assigned shard %d to server %s (DC: %s, Rack: %s)",
|
||||
shardID, server.Address, server.DataCenter, server.Rack)
|
||||
}
|
||||
|
||||
t.SetProgress(70.0)
|
||||
glog.V(1).Infof("Calculated placement for %d shards across %d servers",
|
||||
t.totalShards, len(usedServers))
|
||||
return placements, nil
|
||||
}
|
||||
|
||||
// getAvailableServers retrieves available servers from the master
|
||||
func (t *Task) getAvailableServers() ([]*ServerInfo, error) {
|
||||
ctx := context.Background()
|
||||
conn, err := grpc.Dial(t.masterClient, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to master: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
client := master_pb.NewSeaweedClient(conn)
|
||||
resp, err := client.VolumeList(ctx, &master_pb.VolumeListRequest{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get volume list: %v", err)
|
||||
}
|
||||
|
||||
servers := make([]*ServerInfo, 0)
|
||||
|
||||
// Parse topology information to extract server details
|
||||
if resp.TopologyInfo != nil {
|
||||
for _, dc := range resp.TopologyInfo.DataCenterInfos {
|
||||
for _, rack := range dc.RackInfos {
|
||||
for _, node := range rack.DataNodeInfos {
|
||||
for diskType, diskInfo := range node.DiskInfos {
|
||||
server := &ServerInfo{
|
||||
Address: fmt.Sprintf("%s:%d", node.Id, node.GrpcPort),
|
||||
DataCenter: dc.Id,
|
||||
Rack: rack.Id,
|
||||
AvailableSpace: int64(diskInfo.FreeVolumeCount) * 32 * 1024 * 1024 * 1024, // Rough estimate
|
||||
LoadScore: float64(diskInfo.ActiveVolumeCount) / float64(diskInfo.MaxVolumeCount),
|
||||
ShardCount: 0,
|
||||
}
|
||||
|
||||
// Skip servers that are full or have high load
|
||||
if diskInfo.FreeVolumeCount > 0 && server.LoadScore < 0.9 {
|
||||
servers = append(servers, server)
|
||||
glog.V(2).Infof("Available server: %s (DC: %s, Rack: %s, DiskType: %s, Load: %.2f)",
|
||||
server.Address, server.DataCenter, server.Rack, diskType, server.LoadScore)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return servers, nil
|
||||
}
|
||||
|
||||
// rankServersForPlacement sorts servers by desirability for shard placement
|
||||
func (t *Task) rankServersForPlacement(servers []*ServerInfo) {
|
||||
sort.Slice(servers, func(i, j int) bool {
|
||||
serverA, serverB := servers[i], servers[j]
|
||||
|
||||
// Primary criteria: lower load is better
|
||||
if serverA.LoadScore != serverB.LoadScore {
|
||||
return serverA.LoadScore < serverB.LoadScore
|
||||
}
|
||||
|
||||
// Secondary criteria: more available space is better
|
||||
if serverA.AvailableSpace != serverB.AvailableSpace {
|
||||
return serverA.AvailableSpace > serverB.AvailableSpace
|
||||
}
|
||||
|
||||
// Tertiary criteria: fewer existing shards is better
|
||||
return serverA.ShardCount < serverB.ShardCount
|
||||
})
|
||||
}
|
||||
|
||||
// selectBestServerForShard selects the best server for a specific shard considering affinity
|
||||
func (t *Task) selectBestServerForShard(servers []*ServerInfo, usedServers map[string]int, shardID int) *ServerInfo {
|
||||
// For data shards (0-9), prefer distribution across different racks
|
||||
// For parity shards (10-13), can be more flexible
|
||||
isDataShard := shardID < t.dataShards
|
||||
|
||||
var candidates []*ServerInfo
|
||||
|
||||
if isDataShard {
|
||||
// For data shards, prioritize rack diversity
|
||||
usedRacks := make(map[string]bool)
|
||||
for _, server := range servers {
|
||||
if count, exists := usedServers[server.Address]; exists && count > 0 {
|
||||
usedRacks[server.Rack] = true
|
||||
}
|
||||
}
|
||||
|
||||
// First try to find servers in unused racks
|
||||
for _, server := range servers {
|
||||
if !usedRacks[server.Rack] && usedServers[server.Address] < 2 { // Max 2 shards per server
|
||||
candidates = append(candidates, server)
|
||||
}
|
||||
}
|
||||
|
||||
// If no unused racks, fall back to any available server
|
||||
if len(candidates) == 0 {
|
||||
for _, server := range servers {
|
||||
if usedServers[server.Address] < 2 {
|
||||
candidates = append(candidates, server)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// For parity shards, just avoid overloading servers
|
||||
for _, server := range servers {
|
||||
if usedServers[server.Address] < 2 {
|
||||
candidates = append(candidates, server)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(candidates) == 0 {
|
||||
// Last resort: allow up to 3 shards per server
|
||||
for _, server := range servers {
|
||||
if usedServers[server.Address] < 3 {
|
||||
candidates = append(candidates, server)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(candidates) > 0 {
|
||||
return candidates[0] // Already sorted by desirability
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// selectBackupServers selects backup servers for redundancy
|
||||
func (t *Task) selectBackupServers(servers []*ServerInfo, primaryServer *ServerInfo, count int) []string {
|
||||
var backups []string
|
||||
|
||||
for _, server := range servers {
|
||||
if server.Address != primaryServer.Address && server.Rack != primaryServer.Rack {
|
||||
backups = append(backups, server.Address)
|
||||
if len(backups) >= count {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return backups
|
||||
}
|
||||
|
||||
// distributeShards uploads shards to their assigned servers
|
||||
func (t *Task) distributeShards(shardFiles []string, placements []ShardPlacement) error {
|
||||
t.currentStep = "distributing_shards"
|
||||
t.SetProgress(75.0)
|
||||
glog.V(1).Infof("Distributing %d shards to target servers", len(placements))
|
||||
|
||||
// Distribute shards in parallel for better performance
|
||||
successCount := 0
|
||||
errors := make([]error, 0)
|
||||
|
||||
for i, placement := range placements {
|
||||
shardFile := shardFiles[i]
|
||||
|
||||
err := t.uploadShardToServer(shardFile, placement)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to upload shard %d to %s: %v", i, placement.ServerAddr, err)
|
||||
errors = append(errors, err)
|
||||
|
||||
// Try backup servers
|
||||
uploaded := false
|
||||
for _, backupAddr := range placement.BackupAddrs {
|
||||
backupPlacement := placement
|
||||
backupPlacement.ServerAddr = backupAddr
|
||||
if err := t.uploadShardToServer(shardFile, backupPlacement); err == nil {
|
||||
glog.V(1).Infof("Successfully uploaded shard %d to backup server %s", i, backupAddr)
|
||||
uploaded = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !uploaded {
|
||||
return fmt.Errorf("failed to upload shard %d to any server", i)
|
||||
}
|
||||
}
|
||||
|
||||
successCount++
|
||||
progress := 75.0 + (float64(successCount)/float64(len(placements)))*15.0
|
||||
t.SetProgress(progress)
|
||||
|
||||
glog.V(2).Infof("Successfully distributed shard %d to %s", i, placement.ServerAddr)
|
||||
}
|
||||
|
||||
if len(errors) > 0 && successCount < len(placements)/2 {
|
||||
return fmt.Errorf("too many shard distribution failures: %d/%d", len(errors), len(placements))
|
||||
}
|
||||
|
||||
t.SetProgress(90.0)
|
||||
glog.V(1).Infof("Successfully distributed %d/%d shards", successCount, len(placements))
|
||||
return nil
|
||||
}
|
||||
|
||||
// uploadShardToServer uploads a shard file to a specific server
|
||||
func (t *Task) uploadShardToServer(shardFile string, placement ShardPlacement) error {
|
||||
glog.V(2).Infof("Uploading shard %d to server %s", placement.ShardID, placement.ServerAddr)
|
||||
|
||||
ctx := context.Background()
|
||||
conn, err := grpc.Dial(placement.ServerAddr, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to server %s: %v", placement.ServerAddr, err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
client := volume_server_pb.NewVolumeServerClient(conn)
|
||||
|
||||
// Upload shard using VolumeEcShardsCopy - this assumes shards are already generated locally
|
||||
// and we're copying them to the target server
|
||||
shardIds := []uint32{uint32(placement.ShardID)}
|
||||
_, err = client.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
|
||||
VolumeId: t.volumeID,
|
||||
Collection: t.collection,
|
||||
ShardIds: shardIds,
|
||||
CopyEcxFile: true,
|
||||
CopyEcjFile: true,
|
||||
CopyVifFile: true,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to copy EC shard: %v", err)
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Successfully uploaded shard %d to %s", placement.ShardID, placement.ServerAddr)
|
||||
return nil
|
||||
}
|
||||
|
||||
// verifyAndCleanupSource verifies the EC conversion and cleans up the source volume
|
||||
func (t *Task) verifyAndCleanupSource() error {
|
||||
t.currentStep = "verify_cleanup"
|
||||
t.SetProgress(95.0)
|
||||
glog.V(1).Infof("Verifying EC conversion and cleaning up source volume %d", t.volumeID)
|
||||
|
||||
ctx := context.Background()
|
||||
conn, err := grpc.Dial(t.sourceServer, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to source server: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
client := volume_server_pb.NewVolumeServerClient(conn)
|
||||
|
||||
// Verify source volume is read-only
|
||||
statusResp, err := client.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
if err == nil && statusResp.IsReadOnly {
|
||||
glog.V(1).Infof("Source volume %d is confirmed read-only", t.volumeID)
|
||||
}
|
||||
|
||||
// Delete source volume files (optional - could be kept for backup)
|
||||
// This would normally be done after confirming all shards are properly distributed
|
||||
// _, err = client.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{
|
||||
// VolumeId: t.volumeID,
|
||||
// })
|
||||
// if err != nil {
|
||||
// glog.Warningf("Failed to delete source volume: %v", err)
|
||||
// }
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// cleanup removes temporary files and directories
|
||||
func (t *Task) cleanup(workDir string) {
|
||||
glog.V(1).Infof("Cleaning up work directory: %s", workDir)
|
||||
if err := os.RemoveAll(workDir); err != nil {
|
||||
glog.Warningf("Failed to cleanup work directory %s: %v", workDir, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Validate validates the task parameters
|
||||
func (t *Task) Validate(params types.TaskParams) error {
|
||||
if params.VolumeID == 0 {
|
||||
@@ -126,19 +676,24 @@ func (t *Task) Validate(params types.TaskParams) error {
|
||||
if params.Server == "" {
|
||||
return fmt.Errorf("server is required")
|
||||
}
|
||||
if t.masterClient == "" {
|
||||
return fmt.Errorf("master_client is required")
|
||||
}
|
||||
if t.workDir == "" {
|
||||
return fmt.Errorf("work_dir is required")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// EstimateTime estimates the time needed for the task
|
||||
// EstimateTime estimates the time needed for EC processing
|
||||
func (t *Task) EstimateTime(params types.TaskParams) time.Duration {
|
||||
// Base time for EC operations - varies significantly by volume size
|
||||
// For a typical 30GB volume, EC generation can take 5-15 minutes
|
||||
baseTime := 10 * time.Minute
|
||||
baseTime := 20 * time.Minute // Processing takes time due to comprehensive operations
|
||||
|
||||
// Could adjust based on volume size if available in params
|
||||
if size, ok := params.Parameters["volume_size"].(int64); ok {
|
||||
// Rough estimate: 1 minute per GB
|
||||
estimatedTime := time.Duration(size/(1024*1024*1024)) * time.Minute
|
||||
// More accurate estimate based on volume size
|
||||
// Account for copying, encoding, and distribution
|
||||
gbSize := size / (1024 * 1024 * 1024)
|
||||
estimatedTime := time.Duration(gbSize*2) * time.Minute // 2 minutes per GB
|
||||
if estimatedTime > baseTime {
|
||||
return estimatedTime
|
||||
}
|
||||
@@ -147,11 +702,22 @@ func (t *Task) EstimateTime(params types.TaskParams) time.Duration {
|
||||
return baseTime
|
||||
}
|
||||
|
||||
// GetProgress returns the current progress
|
||||
// GetProgress returns current progress with detailed step information
|
||||
func (t *Task) GetProgress() float64 {
|
||||
return t.BaseTask.GetProgress()
|
||||
}
|
||||
|
||||
// GetCurrentStep returns the current processing step
|
||||
func (t *Task) GetCurrentStep() string {
|
||||
return t.currentStep
|
||||
}
|
||||
|
||||
// SetEstimatedDuration sets the estimated duration for the task
|
||||
func (t *Task) SetEstimatedDuration(duration time.Duration) {
|
||||
// This can be implemented to store the estimated duration if needed
|
||||
// For now, we'll use the dynamic estimation from EstimateTime
|
||||
}
|
||||
|
||||
// Cancel cancels the task
|
||||
func (t *Task) Cancel() error {
|
||||
return t.BaseTask.Cancel()
|
||||
|
||||
@@ -1,689 +0,0 @@
|
||||
package erasure_coding
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// EnhancedECTask implements comprehensive erasure coding with local processing and smart distribution
|
||||
type EnhancedECTask struct {
|
||||
*tasks.BaseTask
|
||||
sourceServer string
|
||||
volumeID uint32
|
||||
collection string
|
||||
workDir string
|
||||
masterClient string
|
||||
grpcDialOpt grpc.DialOption
|
||||
|
||||
// EC parameters
|
||||
dataShards int // Default: 10
|
||||
parityShards int // Default: 4
|
||||
totalShards int // Default: 14
|
||||
|
||||
// Progress tracking
|
||||
currentStep string
|
||||
stepProgress map[string]float64
|
||||
}
|
||||
|
||||
// ServerInfo holds information about available servers for shard placement
|
||||
type ServerInfo struct {
|
||||
Address string
|
||||
DataCenter string
|
||||
Rack string
|
||||
AvailableSpace int64
|
||||
LoadScore float64
|
||||
ShardCount int
|
||||
}
|
||||
|
||||
// ShardPlacement represents where a shard should be placed
|
||||
type ShardPlacement struct {
|
||||
ShardID int
|
||||
ServerAddr string
|
||||
DataCenter string
|
||||
Rack string
|
||||
BackupAddrs []string // Alternative servers for redundancy
|
||||
}
|
||||
|
||||
// NewEnhancedECTask creates a new enhanced erasure coding task
|
||||
func NewEnhancedECTask(sourceServer string, volumeID uint32, masterClient string, workDir string) *EnhancedECTask {
|
||||
task := &EnhancedECTask{
|
||||
BaseTask: tasks.NewBaseTask(types.TaskTypeErasureCoding),
|
||||
sourceServer: sourceServer,
|
||||
volumeID: volumeID,
|
||||
masterClient: masterClient,
|
||||
workDir: workDir,
|
||||
dataShards: 10,
|
||||
parityShards: 4,
|
||||
totalShards: 14,
|
||||
stepProgress: make(map[string]float64),
|
||||
}
|
||||
return task
|
||||
}
|
||||
|
||||
// Execute performs the comprehensive EC operation
|
||||
func (t *EnhancedECTask) Execute(params types.TaskParams) error {
|
||||
glog.Infof("Starting enhanced erasure coding for volume %d from server %s", t.volumeID, t.sourceServer)
|
||||
|
||||
// Extract parameters
|
||||
t.collection = params.Collection
|
||||
if t.collection == "" {
|
||||
t.collection = "default"
|
||||
}
|
||||
|
||||
// Create working directory for this task
|
||||
taskWorkDir := filepath.Join(t.workDir, fmt.Sprintf("ec_%d_%d", t.volumeID, time.Now().Unix()))
|
||||
err := os.MkdirAll(taskWorkDir, 0755)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create work directory %s: %v", taskWorkDir, err)
|
||||
}
|
||||
defer t.cleanup(taskWorkDir)
|
||||
|
||||
// Step 1: Copy volume data to local disk
|
||||
if err := t.copyVolumeDataLocally(taskWorkDir); err != nil {
|
||||
return fmt.Errorf("failed to copy volume data: %v", err)
|
||||
}
|
||||
|
||||
// Step 2: Mark source volume as read-only
|
||||
if err := t.markVolumeReadOnly(); err != nil {
|
||||
return fmt.Errorf("failed to mark volume read-only: %v", err)
|
||||
}
|
||||
|
||||
// Step 3: Perform local EC encoding
|
||||
shardFiles, err := t.performLocalECEncoding(taskWorkDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to perform EC encoding: %v", err)
|
||||
}
|
||||
|
||||
// Step 4: Find optimal shard placement
|
||||
placements, err := t.calculateOptimalShardPlacement()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to calculate shard placement: %v", err)
|
||||
}
|
||||
|
||||
// Step 5: Distribute shards to target servers
|
||||
if err := t.distributeShards(shardFiles, placements); err != nil {
|
||||
return fmt.Errorf("failed to distribute shards: %v", err)
|
||||
}
|
||||
|
||||
// Step 6: Verify and cleanup source volume
|
||||
if err := t.verifyAndCleanupSource(); err != nil {
|
||||
return fmt.Errorf("failed to verify and cleanup: %v", err)
|
||||
}
|
||||
|
||||
t.SetProgress(100.0)
|
||||
glog.Infof("Successfully completed enhanced erasure coding for volume %d", t.volumeID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// copyVolumeDataLocally copies the volume data from source server to local disk
|
||||
func (t *EnhancedECTask) copyVolumeDataLocally(workDir string) error {
|
||||
t.currentStep = "copying_volume_data"
|
||||
t.SetProgress(5.0)
|
||||
glog.V(1).Infof("Copying volume %d data from %s to local disk", t.volumeID, t.sourceServer)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Connect to source volume server
|
||||
conn, err := grpc.Dial(t.sourceServer, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to source server %s: %v", t.sourceServer, err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
client := volume_server_pb.NewVolumeServerClient(conn)
|
||||
|
||||
// Get volume info first
|
||||
statusResp, err := client.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get volume status: %v", err)
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Volume %d size: %d bytes, file count: %d",
|
||||
t.volumeID, statusResp.VolumeSize, statusResp.FileCount)
|
||||
|
||||
// Copy .dat file
|
||||
datFile := filepath.Join(workDir, fmt.Sprintf("%d.dat", t.volumeID))
|
||||
if err := t.copyVolumeFile(client, ctx, t.volumeID, ".dat", datFile, statusResp.VolumeSize); err != nil {
|
||||
return fmt.Errorf("failed to copy .dat file: %v", err)
|
||||
}
|
||||
|
||||
// Copy .idx file
|
||||
idxFile := filepath.Join(workDir, fmt.Sprintf("%d.idx", t.volumeID))
|
||||
if err := t.copyVolumeFile(client, ctx, t.volumeID, ".idx", idxFile, 0); err != nil {
|
||||
return fmt.Errorf("failed to copy .idx file: %v", err)
|
||||
}
|
||||
|
||||
t.SetProgress(15.0)
|
||||
glog.V(1).Infof("Successfully copied volume %d files to %s", t.volumeID, workDir)
|
||||
return nil
|
||||
}
|
||||
|
||||
// copyVolumeFile copies a specific volume file from source server
|
||||
func (t *EnhancedECTask) copyVolumeFile(client volume_server_pb.VolumeServerClient, ctx context.Context,
|
||||
volumeID uint32, extension string, localPath string, expectedSize uint64) error {
|
||||
|
||||
// Stream volume file data using CopyFile API
|
||||
stream, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
|
||||
VolumeId: volumeID,
|
||||
Ext: extension,
|
||||
Collection: t.collection,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start volume copy stream: %v", err)
|
||||
}
|
||||
|
||||
// Create local file
|
||||
file, err := os.Create(localPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create local file %s: %v", localPath, err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Copy data with progress tracking
|
||||
var totalBytes int64
|
||||
for {
|
||||
resp, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to receive volume data: %v", err)
|
||||
}
|
||||
|
||||
written, err := file.Write(resp.FileContent)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write to local file: %v", err)
|
||||
}
|
||||
|
||||
totalBytes += int64(written)
|
||||
|
||||
// Update progress for large files
|
||||
if expectedSize > 0 {
|
||||
progress := float64(totalBytes) / float64(expectedSize) * 10.0 // 10% of total progress
|
||||
t.SetProgress(5.0 + progress)
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Copied %d bytes to %s", totalBytes, localPath)
|
||||
return nil
|
||||
}
|
||||
|
||||
// markVolumeReadOnly marks the source volume as read-only
|
||||
func (t *EnhancedECTask) markVolumeReadOnly() error {
|
||||
t.currentStep = "marking_readonly"
|
||||
t.SetProgress(20.0)
|
||||
glog.V(1).Infof("Marking volume %d as read-only", t.volumeID)
|
||||
|
||||
ctx := context.Background()
|
||||
conn, err := grpc.Dial(t.sourceServer, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to source server: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
client := volume_server_pb.NewVolumeServerClient(conn)
|
||||
_, err = client.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to mark volume read-only: %v", err)
|
||||
}
|
||||
|
||||
t.SetProgress(25.0)
|
||||
return nil
|
||||
}
|
||||
|
||||
// performLocalECEncoding performs Reed-Solomon encoding on local volume files
|
||||
func (t *EnhancedECTask) performLocalECEncoding(workDir string) ([]string, error) {
|
||||
t.currentStep = "encoding"
|
||||
t.SetProgress(30.0)
|
||||
glog.V(1).Infof("Performing local EC encoding for volume %d", t.volumeID)
|
||||
|
||||
datFile := filepath.Join(workDir, fmt.Sprintf("%d.dat", t.volumeID))
|
||||
idxFile := filepath.Join(workDir, fmt.Sprintf("%d.idx", t.volumeID))
|
||||
|
||||
// Check if files exist and get their sizes
|
||||
datInfo, err := os.Stat(datFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to stat dat file: %v", err)
|
||||
}
|
||||
|
||||
idxInfo, err := os.Stat(idxFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to stat idx file: %v", err)
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Encoding files: %s (%d bytes), %s (%d bytes)",
|
||||
datFile, datInfo.Size(), idxFile, idxInfo.Size())
|
||||
|
||||
// Generate EC shards using SeaweedFS erasure coding
|
||||
shardFiles := make([]string, t.totalShards)
|
||||
for i := 0; i < t.totalShards; i++ {
|
||||
shardFiles[i] = filepath.Join(workDir, fmt.Sprintf("%d.ec%02d", t.volumeID, i))
|
||||
}
|
||||
|
||||
// Encode .dat file
|
||||
if err := t.encodeFile(datFile, shardFiles, ".dat"); err != nil {
|
||||
return nil, fmt.Errorf("failed to encode dat file: %v", err)
|
||||
}
|
||||
|
||||
t.SetProgress(45.0)
|
||||
|
||||
// Encode .idx file
|
||||
if err := t.encodeFile(idxFile, shardFiles, ".idx"); err != nil {
|
||||
return nil, fmt.Errorf("failed to encode idx file: %v", err)
|
||||
}
|
||||
|
||||
t.SetProgress(60.0)
|
||||
glog.V(1).Infof("Successfully created %d EC shards for volume %d", t.totalShards, t.volumeID)
|
||||
return shardFiles, nil
|
||||
}
|
||||
|
||||
// encodeFile encodes a single file into EC shards
|
||||
func (t *EnhancedECTask) encodeFile(inputFile string, shardFiles []string, fileType string) error {
|
||||
// Read input file
|
||||
data, err := os.ReadFile(inputFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read input file: %v", err)
|
||||
}
|
||||
|
||||
// Write data to a temporary file first, then use SeaweedFS erasure coding
|
||||
tempFile := filepath.Join(filepath.Dir(shardFiles[0]), fmt.Sprintf("temp_%s", filepath.Base(inputFile)))
|
||||
err = os.WriteFile(tempFile, data, 0644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write temp file: %v", err)
|
||||
}
|
||||
defer os.Remove(tempFile)
|
||||
|
||||
// Use SeaweedFS erasure coding library with base filename
|
||||
baseFileName := tempFile[:len(tempFile)-len(filepath.Ext(tempFile))]
|
||||
err = erasure_coding.WriteEcFiles(baseFileName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write EC files: %v", err)
|
||||
}
|
||||
|
||||
// Verify that shards were created
|
||||
for i, shardFile := range shardFiles {
|
||||
if _, err := os.Stat(shardFile); err != nil {
|
||||
glog.Warningf("Shard %d file %s not found: %v", i, shardFile, err)
|
||||
} else {
|
||||
info, _ := os.Stat(shardFile)
|
||||
glog.V(2).Infof("Created shard %d: %s (%d bytes)", i, shardFile, info.Size())
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// calculateOptimalShardPlacement determines where to place each shard for optimal distribution
|
||||
func (t *EnhancedECTask) calculateOptimalShardPlacement() ([]ShardPlacement, error) {
|
||||
t.currentStep = "calculating_placement"
|
||||
t.SetProgress(65.0)
|
||||
glog.V(1).Infof("Calculating optimal shard placement for volume %d", t.volumeID)
|
||||
|
||||
// Get available servers from master
|
||||
servers, err := t.getAvailableServers()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get available servers: %v", err)
|
||||
}
|
||||
|
||||
if len(servers) < t.totalShards {
|
||||
return nil, fmt.Errorf("insufficient servers: need %d, have %d", t.totalShards, len(servers))
|
||||
}
|
||||
|
||||
// Sort servers by placement desirability (considering space, load, affinity)
|
||||
t.rankServersForPlacement(servers)
|
||||
|
||||
// Assign shards to servers with affinity logic
|
||||
placements := make([]ShardPlacement, t.totalShards)
|
||||
usedServers := make(map[string]int) // Track how many shards per server
|
||||
|
||||
for shardID := 0; shardID < t.totalShards; shardID++ {
|
||||
server := t.selectBestServerForShard(servers, usedServers, shardID)
|
||||
if server == nil {
|
||||
return nil, fmt.Errorf("failed to find suitable server for shard %d", shardID)
|
||||
}
|
||||
|
||||
placements[shardID] = ShardPlacement{
|
||||
ShardID: shardID,
|
||||
ServerAddr: server.Address,
|
||||
DataCenter: server.DataCenter,
|
||||
Rack: server.Rack,
|
||||
BackupAddrs: t.selectBackupServers(servers, server, 2),
|
||||
}
|
||||
|
||||
usedServers[server.Address]++
|
||||
glog.V(2).Infof("Assigned shard %d to server %s (DC: %s, Rack: %s)",
|
||||
shardID, server.Address, server.DataCenter, server.Rack)
|
||||
}
|
||||
|
||||
t.SetProgress(70.0)
|
||||
glog.V(1).Infof("Calculated placement for %d shards across %d servers",
|
||||
t.totalShards, len(usedServers))
|
||||
return placements, nil
|
||||
}
|
||||
|
||||
// getAvailableServers retrieves available servers from the master
|
||||
func (t *EnhancedECTask) getAvailableServers() ([]*ServerInfo, error) {
|
||||
ctx := context.Background()
|
||||
conn, err := grpc.Dial(t.masterClient, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to master: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
client := master_pb.NewSeaweedClient(conn)
|
||||
resp, err := client.VolumeList(ctx, &master_pb.VolumeListRequest{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get volume list: %v", err)
|
||||
}
|
||||
|
||||
servers := make([]*ServerInfo, 0)
|
||||
|
||||
// Parse topology information to extract server details
|
||||
if resp.TopologyInfo != nil {
|
||||
for _, dc := range resp.TopologyInfo.DataCenterInfos {
|
||||
for _, rack := range dc.RackInfos {
|
||||
for _, node := range rack.DataNodeInfos {
|
||||
for diskType, diskInfo := range node.DiskInfos {
|
||||
server := &ServerInfo{
|
||||
Address: fmt.Sprintf("%s:%d", node.Id, node.GrpcPort),
|
||||
DataCenter: dc.Id,
|
||||
Rack: rack.Id,
|
||||
AvailableSpace: int64(diskInfo.FreeVolumeCount) * 32 * 1024 * 1024 * 1024, // Rough estimate
|
||||
LoadScore: float64(diskInfo.ActiveVolumeCount) / float64(diskInfo.MaxVolumeCount),
|
||||
ShardCount: 0,
|
||||
}
|
||||
|
||||
// Skip servers that are full or have high load
|
||||
if diskInfo.FreeVolumeCount > 0 && server.LoadScore < 0.9 {
|
||||
servers = append(servers, server)
|
||||
glog.V(2).Infof("Available server: %s (DC: %s, Rack: %s, DiskType: %s, Load: %.2f)",
|
||||
server.Address, server.DataCenter, server.Rack, diskType, server.LoadScore)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return servers, nil
|
||||
}
|
||||
|
||||
// rankServersForPlacement sorts servers by desirability for shard placement
|
||||
func (t *EnhancedECTask) rankServersForPlacement(servers []*ServerInfo) {
|
||||
sort.Slice(servers, func(i, j int) bool {
|
||||
serverA, serverB := servers[i], servers[j]
|
||||
|
||||
// Primary criteria: lower load is better
|
||||
if serverA.LoadScore != serverB.LoadScore {
|
||||
return serverA.LoadScore < serverB.LoadScore
|
||||
}
|
||||
|
||||
// Secondary criteria: more available space is better
|
||||
if serverA.AvailableSpace != serverB.AvailableSpace {
|
||||
return serverA.AvailableSpace > serverB.AvailableSpace
|
||||
}
|
||||
|
||||
// Tertiary criteria: fewer existing shards is better
|
||||
return serverA.ShardCount < serverB.ShardCount
|
||||
})
|
||||
}
|
||||
|
||||
// selectBestServerForShard selects the best server for a specific shard considering affinity
|
||||
func (t *EnhancedECTask) selectBestServerForShard(servers []*ServerInfo, usedServers map[string]int, shardID int) *ServerInfo {
|
||||
// For data shards (0-9), prefer distribution across different racks
|
||||
// For parity shards (10-13), can be more flexible
|
||||
isDataShard := shardID < t.dataShards
|
||||
|
||||
var candidates []*ServerInfo
|
||||
|
||||
if isDataShard {
|
||||
// For data shards, prioritize rack diversity
|
||||
usedRacks := make(map[string]bool)
|
||||
for _, server := range servers {
|
||||
if count, exists := usedServers[server.Address]; exists && count > 0 {
|
||||
usedRacks[server.Rack] = true
|
||||
}
|
||||
}
|
||||
|
||||
// First try to find servers in unused racks
|
||||
for _, server := range servers {
|
||||
if !usedRacks[server.Rack] && usedServers[server.Address] < 2 { // Max 2 shards per server
|
||||
candidates = append(candidates, server)
|
||||
}
|
||||
}
|
||||
|
||||
// If no unused racks, fall back to any available server
|
||||
if len(candidates) == 0 {
|
||||
for _, server := range servers {
|
||||
if usedServers[server.Address] < 2 {
|
||||
candidates = append(candidates, server)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// For parity shards, just avoid overloading servers
|
||||
for _, server := range servers {
|
||||
if usedServers[server.Address] < 2 {
|
||||
candidates = append(candidates, server)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(candidates) == 0 {
|
||||
// Last resort: allow up to 3 shards per server
|
||||
for _, server := range servers {
|
||||
if usedServers[server.Address] < 3 {
|
||||
candidates = append(candidates, server)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(candidates) > 0 {
|
||||
return candidates[0] // Already sorted by desirability
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// selectBackupServers selects backup servers for redundancy
|
||||
func (t *EnhancedECTask) selectBackupServers(servers []*ServerInfo, primaryServer *ServerInfo, count int) []string {
|
||||
var backups []string
|
||||
|
||||
for _, server := range servers {
|
||||
if server.Address != primaryServer.Address && server.Rack != primaryServer.Rack {
|
||||
backups = append(backups, server.Address)
|
||||
if len(backups) >= count {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return backups
|
||||
}
|
||||
|
||||
// distributeShards uploads shards to their assigned servers
|
||||
func (t *EnhancedECTask) distributeShards(shardFiles []string, placements []ShardPlacement) error {
|
||||
t.currentStep = "distributing_shards"
|
||||
t.SetProgress(75.0)
|
||||
glog.V(1).Infof("Distributing %d shards to target servers", len(placements))
|
||||
|
||||
// Distribute shards in parallel for better performance
|
||||
successCount := 0
|
||||
errors := make([]error, 0)
|
||||
|
||||
for i, placement := range placements {
|
||||
shardFile := shardFiles[i]
|
||||
|
||||
err := t.uploadShardToServer(shardFile, placement)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to upload shard %d to %s: %v", i, placement.ServerAddr, err)
|
||||
errors = append(errors, err)
|
||||
|
||||
// Try backup servers
|
||||
uploaded := false
|
||||
for _, backupAddr := range placement.BackupAddrs {
|
||||
backupPlacement := placement
|
||||
backupPlacement.ServerAddr = backupAddr
|
||||
if err := t.uploadShardToServer(shardFile, backupPlacement); err == nil {
|
||||
glog.V(1).Infof("Successfully uploaded shard %d to backup server %s", i, backupAddr)
|
||||
uploaded = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !uploaded {
|
||||
return fmt.Errorf("failed to upload shard %d to any server", i)
|
||||
}
|
||||
}
|
||||
|
||||
successCount++
|
||||
progress := 75.0 + (float64(successCount)/float64(len(placements)))*15.0
|
||||
t.SetProgress(progress)
|
||||
|
||||
glog.V(2).Infof("Successfully distributed shard %d to %s", i, placement.ServerAddr)
|
||||
}
|
||||
|
||||
if len(errors) > 0 && successCount < len(placements)/2 {
|
||||
return fmt.Errorf("too many shard distribution failures: %d/%d", len(errors), len(placements))
|
||||
}
|
||||
|
||||
t.SetProgress(90.0)
|
||||
glog.V(1).Infof("Successfully distributed %d/%d shards", successCount, len(placements))
|
||||
return nil
|
||||
}
|
||||
|
||||
// uploadShardToServer uploads a shard file to a specific server
|
||||
func (t *EnhancedECTask) uploadShardToServer(shardFile string, placement ShardPlacement) error {
|
||||
glog.V(2).Infof("Uploading shard %d to server %s", placement.ShardID, placement.ServerAddr)
|
||||
|
||||
ctx := context.Background()
|
||||
conn, err := grpc.Dial(placement.ServerAddr, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to server %s: %v", placement.ServerAddr, err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
client := volume_server_pb.NewVolumeServerClient(conn)
|
||||
|
||||
// Upload shard using VolumeEcShardsCopy - this assumes shards are already generated locally
|
||||
// and we're copying them to the target server
|
||||
shardIds := []uint32{uint32(placement.ShardID)}
|
||||
_, err = client.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
|
||||
VolumeId: t.volumeID,
|
||||
Collection: t.collection,
|
||||
ShardIds: shardIds,
|
||||
CopyEcxFile: true,
|
||||
CopyEcjFile: true,
|
||||
CopyVifFile: true,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to copy EC shard: %v", err)
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Successfully uploaded shard %d to %s", placement.ShardID, placement.ServerAddr)
|
||||
return nil
|
||||
}
|
||||
|
||||
// verifyAndCleanupSource verifies the EC conversion and cleans up the source volume
|
||||
func (t *EnhancedECTask) verifyAndCleanupSource() error {
|
||||
t.currentStep = "verify_cleanup"
|
||||
t.SetProgress(95.0)
|
||||
glog.V(1).Infof("Verifying EC conversion and cleaning up source volume %d", t.volumeID)
|
||||
|
||||
ctx := context.Background()
|
||||
conn, err := grpc.Dial(t.sourceServer, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to source server: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
client := volume_server_pb.NewVolumeServerClient(conn)
|
||||
|
||||
// Verify source volume is read-only
|
||||
statusResp, err := client.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
if err == nil && statusResp.IsReadOnly {
|
||||
glog.V(1).Infof("Source volume %d is confirmed read-only", t.volumeID)
|
||||
}
|
||||
|
||||
// Delete source volume files (optional - could be kept for backup)
|
||||
// This would normally be done after confirming all shards are properly distributed
|
||||
// _, err = client.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{
|
||||
// VolumeId: t.volumeID,
|
||||
// })
|
||||
// if err != nil {
|
||||
// glog.Warningf("Failed to delete source volume: %v", err)
|
||||
// }
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// cleanup removes temporary files and directories
|
||||
func (t *EnhancedECTask) cleanup(workDir string) {
|
||||
glog.V(1).Infof("Cleaning up work directory: %s", workDir)
|
||||
if err := os.RemoveAll(workDir); err != nil {
|
||||
glog.Warningf("Failed to cleanup work directory %s: %v", workDir, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Validate validates the enhanced task parameters
|
||||
func (t *EnhancedECTask) Validate(params types.TaskParams) error {
|
||||
if params.VolumeID == 0 {
|
||||
return fmt.Errorf("volume_id is required")
|
||||
}
|
||||
if params.Server == "" {
|
||||
return fmt.Errorf("server is required")
|
||||
}
|
||||
if t.masterClient == "" {
|
||||
return fmt.Errorf("master_client is required")
|
||||
}
|
||||
if t.workDir == "" {
|
||||
return fmt.Errorf("work_dir is required")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// EstimateTime estimates the time needed for enhanced EC processing
|
||||
func (t *EnhancedECTask) EstimateTime(params types.TaskParams) time.Duration {
|
||||
baseTime := 20 * time.Minute // Enhanced processing takes longer
|
||||
|
||||
if size, ok := params.Parameters["volume_size"].(int64); ok {
|
||||
// More accurate estimate based on volume size
|
||||
// Account for copying, encoding, and distribution
|
||||
gbSize := size / (1024 * 1024 * 1024)
|
||||
estimatedTime := time.Duration(gbSize*2) * time.Minute // 2 minutes per GB
|
||||
if estimatedTime > baseTime {
|
||||
return estimatedTime
|
||||
}
|
||||
}
|
||||
|
||||
return baseTime
|
||||
}
|
||||
|
||||
// GetProgress returns current progress with detailed step information
|
||||
func (t *EnhancedECTask) GetProgress() float64 {
|
||||
return t.BaseTask.GetProgress()
|
||||
}
|
||||
|
||||
// GetCurrentStep returns the current processing step
|
||||
func (t *EnhancedECTask) GetCurrentStep() string {
|
||||
return t.currentStep
|
||||
}
|
||||
@@ -33,49 +33,20 @@ func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) {
|
||||
return nil, fmt.Errorf("server is required")
|
||||
}
|
||||
|
||||
task := NewTask(params.Server, params.VolumeID)
|
||||
// Extract additional parameters for comprehensive EC
|
||||
masterClient := "localhost:9333" // Default master client
|
||||
workDir := "/tmp/seaweedfs_ec_work" // Default work directory
|
||||
|
||||
if mc, ok := params.Parameters["master_client"].(string); ok && mc != "" {
|
||||
masterClient = mc
|
||||
}
|
||||
if wd, ok := params.Parameters["work_dir"].(string); ok && wd != "" {
|
||||
workDir = wd
|
||||
}
|
||||
|
||||
// Create EC task with comprehensive capabilities
|
||||
task := NewTaskWithParams(params.Server, params.VolumeID, masterClient, workDir)
|
||||
task.SetEstimatedDuration(task.EstimateTime(params))
|
||||
|
||||
return task, nil
|
||||
}
|
||||
|
||||
// Shared detector and scheduler instances
|
||||
var (
|
||||
sharedDetector *EcDetector
|
||||
sharedScheduler *Scheduler
|
||||
)
|
||||
|
||||
// getSharedInstances returns the shared detector and scheduler instances
|
||||
func getSharedInstances() (*EcDetector, *Scheduler) {
|
||||
if sharedDetector == nil {
|
||||
sharedDetector = NewEcDetector()
|
||||
}
|
||||
if sharedScheduler == nil {
|
||||
sharedScheduler = NewScheduler()
|
||||
}
|
||||
return sharedDetector, sharedScheduler
|
||||
}
|
||||
|
||||
// GetSharedInstances returns the shared detector and scheduler instances (public access)
|
||||
func GetSharedInstances() (*EcDetector, *Scheduler) {
|
||||
return getSharedInstances()
|
||||
}
|
||||
|
||||
// Auto-register this task when the package is imported
|
||||
func init() {
|
||||
factory := NewFactory()
|
||||
tasks.AutoRegister(types.TaskTypeErasureCoding, factory)
|
||||
|
||||
// Get shared instances for all registrations
|
||||
detector, scheduler := getSharedInstances()
|
||||
|
||||
// Register with types registry
|
||||
tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
|
||||
registry.RegisterTask(detector, scheduler)
|
||||
})
|
||||
|
||||
// Register with UI registry using the same instances
|
||||
tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
|
||||
RegisterUI(uiRegistry, detector, scheduler)
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user