mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-28 20:50:20 +00:00
remove unused
This commit is contained in:
@@ -449,41 +449,3 @@ func (h *MaintenanceHandlers) updateMaintenanceConfig(config *maintenance.Mainte
|
||||
// Delegate to AdminServer's real persistence method
|
||||
return h.adminServer.UpdateMaintenanceConfigData(config)
|
||||
}
|
||||
|
||||
// floatPtr is a helper function to create float64 pointers
|
||||
func floatPtr(f float64) *float64 {
|
||||
return &f
|
||||
}
|
||||
|
||||
// Global templ UI registry - temporarily disabled
|
||||
// var globalTemplUIRegistry *types.UITemplRegistry
|
||||
|
||||
// initTemplUIRegistry initializes the global templ UI registry - temporarily disabled
|
||||
func initTemplUIRegistry() {
|
||||
// Temporarily disabled due to missing types
|
||||
// if globalTemplUIRegistry == nil {
|
||||
// globalTemplUIRegistry = types.NewUITemplRegistry()
|
||||
// // Register vacuum templ UI provider using shared instances
|
||||
// vacuumDetector, vacuumScheduler := vacuum.GetSharedInstances()
|
||||
// vacuum.RegisterUITempl(globalTemplUIRegistry, vacuumDetector, vacuumScheduler)
|
||||
// // Register erasure coding templ UI provider using shared instances
|
||||
// erasureCodingDetector, erasureCodingScheduler := erasure_coding.GetSharedInstances()
|
||||
// erasure_coding.RegisterUITempl(globalTemplUIRegistry, erasureCodingDetector, erasureCodingScheduler)
|
||||
// // Register balance templ UI provider using shared instances
|
||||
// balanceDetector, balanceScheduler := balance.GetSharedInstances()
|
||||
// balance.RegisterUITempl(globalTemplUIRegistry, balanceDetector, balanceScheduler)
|
||||
// }
|
||||
}
|
||||
|
||||
// getTemplUIProvider gets the templ UI provider for a task type - temporarily disabled
|
||||
func getTemplUIProvider(taskType maintenance.MaintenanceTaskType) interface{} {
|
||||
// initTemplUIRegistry()
|
||||
// Convert maintenance task type to worker task type
|
||||
// typesRegistry := tasks.GetGlobalTypesRegistry()
|
||||
// for workerTaskType := range typesRegistry.GetAllDetectors() {
|
||||
// if string(workerTaskType) == string(taskType) {
|
||||
// return globalTemplUIRegistry.GetProvider(workerTaskType)
|
||||
// }
|
||||
// }
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package balance
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
@@ -62,7 +63,7 @@ func (logic *BalanceUILogic) GetCurrentConfig() interface{} {
|
||||
func (logic *BalanceUILogic) ApplyConfig(config interface{}) error {
|
||||
balanceConfig, ok := config.(*BalanceConfig)
|
||||
if !ok {
|
||||
return nil // Will be handled by base provider fallback
|
||||
return fmt.Errorf("invalid configuration type for balance")
|
||||
}
|
||||
|
||||
// Apply to detector
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -46,25 +45,6 @@ type Task struct {
|
||||
stepProgress map[string]float64
|
||||
}
|
||||
|
||||
// ServerInfo holds information about available servers for shard placement
|
||||
type ServerInfo struct {
|
||||
Address string
|
||||
DataCenter string
|
||||
Rack string
|
||||
AvailableSpace int64
|
||||
LoadScore float64
|
||||
ShardCount int
|
||||
}
|
||||
|
||||
// ShardPlacement represents where a shard should be placed
|
||||
type ShardPlacement struct {
|
||||
ShardID int
|
||||
ServerAddr string
|
||||
DataCenter string
|
||||
Rack string
|
||||
BackupAddrs []string // Alternative servers for redundancy
|
||||
}
|
||||
|
||||
// NewTask creates a new erasure coding task
|
||||
func NewTask(sourceServer string, volumeID uint32) *Task {
|
||||
task := &Task{
|
||||
@@ -320,33 +300,6 @@ func (t *Task) copyVolumeFile(client volume_server_pb.VolumeServerClient, ctx co
|
||||
return nil
|
||||
}
|
||||
|
||||
// markVolumeReadOnly marks the source volume as read-only
|
||||
func (t *Task) markVolumeReadOnly() error {
|
||||
t.currentStep = "marking_readonly"
|
||||
t.SetProgress(20.0)
|
||||
glog.V(1).Infof("Marking volume %d as read-only", t.volumeID)
|
||||
|
||||
ctx := context.Background()
|
||||
// Convert to gRPC address
|
||||
grpcAddress := pb.ServerToGrpcAddress(t.sourceServer)
|
||||
conn, err := grpc.NewClient(grpcAddress, t.grpcDialOpt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to source server: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
client := volume_server_pb.NewVolumeServerClient(conn)
|
||||
_, err = client.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to mark volume read-only: %v", err)
|
||||
}
|
||||
|
||||
t.SetProgress(25.0)
|
||||
return nil
|
||||
}
|
||||
|
||||
// performLocalECEncoding performs Reed-Solomon encoding on local volume files
|
||||
func (t *Task) performLocalECEncoding(workDir string) ([]string, error) {
|
||||
t.currentStep = "encoding"
|
||||
@@ -455,324 +408,6 @@ func (t *Task) performLocalECEncoding(workDir string) ([]string, error) {
|
||||
return shardFiles, nil
|
||||
}
|
||||
|
||||
// calculateOptimalShardPlacement determines where to place each shard for optimal distribution
|
||||
func (t *Task) calculateOptimalShardPlacement() ([]ShardPlacement, error) {
|
||||
t.currentStep = "calculating_placement"
|
||||
t.SetProgress(65.0)
|
||||
glog.V(1).Infof("Calculating optimal shard placement for volume %d", t.volumeID)
|
||||
|
||||
// Get available servers from master
|
||||
servers, err := t.getAvailableServers()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get available servers: %v", err)
|
||||
}
|
||||
|
||||
if len(servers) < t.totalShards {
|
||||
return nil, fmt.Errorf("insufficient servers: need %d, have %d", t.totalShards, len(servers))
|
||||
}
|
||||
|
||||
// Sort servers by placement desirability (considering space, load, affinity)
|
||||
t.rankServersForPlacement(servers)
|
||||
|
||||
// Assign shards to servers with affinity logic
|
||||
placements := make([]ShardPlacement, t.totalShards)
|
||||
usedServers := make(map[string]int) // Track how many shards per server
|
||||
|
||||
for shardID := 0; shardID < t.totalShards; shardID++ {
|
||||
server := t.selectBestServerForShard(servers, usedServers, shardID)
|
||||
if server == nil {
|
||||
return nil, fmt.Errorf("failed to find suitable server for shard %d", shardID)
|
||||
}
|
||||
|
||||
placements[shardID] = ShardPlacement{
|
||||
ShardID: shardID,
|
||||
ServerAddr: server.Address,
|
||||
DataCenter: server.DataCenter,
|
||||
Rack: server.Rack,
|
||||
BackupAddrs: t.selectBackupServers(servers, server, 2),
|
||||
}
|
||||
|
||||
usedServers[server.Address]++
|
||||
glog.V(2).Infof("Assigned shard %d to server %s (DC: %s, Rack: %s)",
|
||||
shardID, server.Address, server.DataCenter, server.Rack)
|
||||
}
|
||||
|
||||
t.SetProgress(70.0)
|
||||
glog.V(1).Infof("Calculated placement for %d shards across %d servers",
|
||||
t.totalShards, len(usedServers))
|
||||
return placements, nil
|
||||
}
|
||||
|
||||
// getAvailableServers retrieves available servers from the master
|
||||
func (t *Task) getAvailableServers() ([]*ServerInfo, error) {
|
||||
ctx := context.Background()
|
||||
conn, err := grpc.NewClient(t.masterClient, t.grpcDialOpt)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to master: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
client := master_pb.NewSeaweedClient(conn)
|
||||
resp, err := client.VolumeList(ctx, &master_pb.VolumeListRequest{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get volume list: %v", err)
|
||||
}
|
||||
|
||||
servers := make([]*ServerInfo, 0)
|
||||
|
||||
// Parse topology information to extract server details
|
||||
if resp.TopologyInfo != nil {
|
||||
for _, dc := range resp.TopologyInfo.DataCenterInfos {
|
||||
for _, rack := range dc.RackInfos {
|
||||
for _, node := range rack.DataNodeInfos {
|
||||
for diskType, diskInfo := range node.DiskInfos {
|
||||
server := &ServerInfo{
|
||||
Address: fmt.Sprintf("%s:%d", node.Id, node.GrpcPort),
|
||||
DataCenter: dc.Id,
|
||||
Rack: rack.Id,
|
||||
AvailableSpace: int64(diskInfo.FreeVolumeCount) * 32 * 1024 * 1024 * 1024, // Rough estimate
|
||||
LoadScore: float64(diskInfo.ActiveVolumeCount) / float64(diskInfo.MaxVolumeCount),
|
||||
ShardCount: 0,
|
||||
}
|
||||
|
||||
// Skip servers that are full or have high load
|
||||
if diskInfo.FreeVolumeCount > 0 && server.LoadScore < 0.9 {
|
||||
servers = append(servers, server)
|
||||
glog.V(2).Infof("Available server: %s (DC: %s, Rack: %s, DiskType: %s, Load: %.2f)",
|
||||
server.Address, server.DataCenter, server.Rack, diskType, server.LoadScore)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return servers, nil
|
||||
}
|
||||
|
||||
// rankServersForPlacement sorts servers by desirability for shard placement
|
||||
func (t *Task) rankServersForPlacement(servers []*ServerInfo) {
|
||||
sort.Slice(servers, func(i, j int) bool {
|
||||
serverA, serverB := servers[i], servers[j]
|
||||
|
||||
// Primary criteria: lower load is better
|
||||
if serverA.LoadScore != serverB.LoadScore {
|
||||
return serverA.LoadScore < serverB.LoadScore
|
||||
}
|
||||
|
||||
// Secondary criteria: more available space is better
|
||||
if serverA.AvailableSpace != serverB.AvailableSpace {
|
||||
return serverA.AvailableSpace > serverB.AvailableSpace
|
||||
}
|
||||
|
||||
// Tertiary criteria: fewer existing shards is better
|
||||
return serverA.ShardCount < serverB.ShardCount
|
||||
})
|
||||
}
|
||||
|
||||
// selectBestServerForShard selects the best server for a specific shard considering affinity
|
||||
func (t *Task) selectBestServerForShard(servers []*ServerInfo, usedServers map[string]int, shardID int) *ServerInfo {
|
||||
// For data shards (0-9), prefer distribution across different racks
|
||||
// For parity shards (10-13), can be more flexible
|
||||
isDataShard := shardID < t.dataShards
|
||||
|
||||
var candidates []*ServerInfo
|
||||
|
||||
if isDataShard {
|
||||
// For data shards, prioritize rack diversity
|
||||
usedRacks := make(map[string]bool)
|
||||
for _, server := range servers {
|
||||
if count, exists := usedServers[server.Address]; exists && count > 0 {
|
||||
usedRacks[server.Rack] = true
|
||||
}
|
||||
}
|
||||
|
||||
// First try to find servers in unused racks
|
||||
for _, server := range servers {
|
||||
if !usedRacks[server.Rack] && usedServers[server.Address] < 2 { // Max 2 shards per server
|
||||
candidates = append(candidates, server)
|
||||
}
|
||||
}
|
||||
|
||||
// If no unused racks, fall back to any available server
|
||||
if len(candidates) == 0 {
|
||||
for _, server := range servers {
|
||||
if usedServers[server.Address] < 2 {
|
||||
candidates = append(candidates, server)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// For parity shards, just avoid overloading servers
|
||||
for _, server := range servers {
|
||||
if usedServers[server.Address] < 2 {
|
||||
candidates = append(candidates, server)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(candidates) == 0 {
|
||||
// Last resort: allow up to 3 shards per server
|
||||
for _, server := range servers {
|
||||
if usedServers[server.Address] < 3 {
|
||||
candidates = append(candidates, server)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(candidates) > 0 {
|
||||
return candidates[0] // Already sorted by desirability
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// selectBackupServers selects backup servers for redundancy
|
||||
func (t *Task) selectBackupServers(servers []*ServerInfo, primaryServer *ServerInfo, count int) []string {
|
||||
var backups []string
|
||||
|
||||
for _, server := range servers {
|
||||
if server.Address != primaryServer.Address && server.Rack != primaryServer.Rack {
|
||||
backups = append(backups, server.Address)
|
||||
if len(backups) >= count {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return backups
|
||||
}
|
||||
|
||||
// distributeShards uploads shards to their assigned servers
|
||||
func (t *Task) distributeShards(shardFiles []string, placements []ShardPlacement) error {
|
||||
t.currentStep = "distributing_shards"
|
||||
t.SetProgress(75.0)
|
||||
glog.V(1).Infof("Distributing %d shards to target servers", len(placements))
|
||||
|
||||
// Distribute shards in parallel for better performance
|
||||
successCount := 0
|
||||
errors := make([]error, 0)
|
||||
|
||||
for i, placement := range placements {
|
||||
shardFile := shardFiles[i]
|
||||
|
||||
err := t.uploadShardToServer(shardFile, placement)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to upload shard %d to %s: %v", i, placement.ServerAddr, err)
|
||||
errors = append(errors, err)
|
||||
|
||||
// Try backup servers
|
||||
uploaded := false
|
||||
for _, backupAddr := range placement.BackupAddrs {
|
||||
backupPlacement := placement
|
||||
backupPlacement.ServerAddr = backupAddr
|
||||
if err := t.uploadShardToServer(shardFile, backupPlacement); err == nil {
|
||||
glog.V(1).Infof("Successfully uploaded shard %d to backup server %s", i, backupAddr)
|
||||
uploaded = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !uploaded {
|
||||
return fmt.Errorf("failed to upload shard %d to any server", i)
|
||||
}
|
||||
}
|
||||
|
||||
successCount++
|
||||
progress := 75.0 + (float64(successCount)/float64(len(placements)))*15.0
|
||||
t.SetProgress(progress)
|
||||
|
||||
glog.V(2).Infof("Successfully distributed shard %d to %s", i, placement.ServerAddr)
|
||||
}
|
||||
|
||||
if len(errors) > 0 && successCount < len(placements)/2 {
|
||||
return fmt.Errorf("too many shard distribution failures: %d/%d", len(errors), len(placements))
|
||||
}
|
||||
|
||||
t.SetProgress(90.0)
|
||||
glog.V(1).Infof("Successfully distributed %d/%d shards", successCount, len(placements))
|
||||
return nil
|
||||
}
|
||||
|
||||
// uploadShardToServer uploads a shard file to a specific server
|
||||
func (t *Task) uploadShardToServer(shardFile string, placement ShardPlacement) error {
|
||||
glog.V(2).Infof("Uploading shard %d to server %s", placement.ShardID, placement.ServerAddr)
|
||||
|
||||
ctx := context.Background()
|
||||
// Convert to gRPC address
|
||||
grpcAddress := pb.ServerToGrpcAddress(placement.ServerAddr)
|
||||
conn, err := grpc.NewClient(grpcAddress, t.grpcDialOpt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to server %s: %v", placement.ServerAddr, err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
client := volume_server_pb.NewVolumeServerClient(conn)
|
||||
|
||||
// Upload shard using VolumeEcShardsCopy - this assumes shards are already generated locally
|
||||
// and we're copying them to the target server
|
||||
shardIds := []uint32{uint32(placement.ShardID)}
|
||||
_, err = client.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
|
||||
VolumeId: t.volumeID,
|
||||
Collection: t.collection,
|
||||
ShardIds: shardIds,
|
||||
CopyEcxFile: true,
|
||||
CopyEcjFile: true,
|
||||
CopyVifFile: true,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to copy EC shard: %v", err)
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Successfully uploaded shard %d to %s", placement.ShardID, placement.ServerAddr)
|
||||
return nil
|
||||
}
|
||||
|
||||
// verifyAndCleanupSource verifies the EC conversion and cleans up the source volume
|
||||
func (t *Task) verifyAndCleanupSource() error {
|
||||
t.currentStep = "verify_cleanup"
|
||||
t.SetProgress(95.0)
|
||||
glog.V(1).Infof("Verifying EC conversion and cleaning up source volume %d", t.volumeID)
|
||||
|
||||
ctx := context.Background()
|
||||
// Convert to gRPC address
|
||||
grpcAddress := pb.ServerToGrpcAddress(t.sourceServer)
|
||||
conn, err := grpc.NewClient(grpcAddress, t.grpcDialOpt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to source server: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
client := volume_server_pb.NewVolumeServerClient(conn)
|
||||
|
||||
// Verify source volume is read-only
|
||||
statusResp, err := client.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
if err == nil && statusResp.IsReadOnly {
|
||||
glog.V(1).Infof("Source volume %d is confirmed read-only", t.volumeID)
|
||||
}
|
||||
|
||||
// Delete source volume files (optional - could be kept for backup)
|
||||
// This would normally be done after confirming all shards are properly distributed
|
||||
// _, err = client.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{
|
||||
// VolumeId: t.volumeID,
|
||||
// })
|
||||
// if err != nil {
|
||||
// glog.Warningf("Failed to delete source volume: %v", err)
|
||||
// }
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// cleanup removes temporary files and directories
|
||||
func (t *Task) cleanup(workDir string) {
|
||||
glog.V(1).Infof("Cleaning up work directory: %s", workDir)
|
||||
if err := os.RemoveAll(workDir); err != nil {
|
||||
glog.Warningf("Failed to cleanup work directory %s: %v", workDir, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Validate validates the task parameters
|
||||
func (t *Task) Validate(params types.TaskParams) error {
|
||||
if params.VolumeID == 0 {
|
||||
@@ -1350,12 +985,6 @@ func (t *Task) uploadShardToTargetServer(shardFile string, targetServer pb.Serve
|
||||
return nil
|
||||
}
|
||||
|
||||
// uploadShardDataDirectly is no longer needed - kept for compatibility
|
||||
func (t *Task) uploadShardDataDirectly(file *os.File, targetServer pb.ServerAddress, shardId uint32, fileSize int64) error {
|
||||
// This method is deprecated in favor of gRPC streaming
|
||||
return fmt.Errorf("uploadShardDataDirectly is deprecated - use gRPC ReceiveFile instead")
|
||||
}
|
||||
|
||||
// mountShardOnServer mounts an EC shard on target server
|
||||
func (t *Task) mountShardOnServer(targetServer pb.ServerAddress, shardId uint32) error {
|
||||
glog.V(1).Infof("MOUNT START: Mounting shard %d on server %s", shardId, targetServer)
|
||||
@@ -1387,99 +1016,3 @@ func (t *Task) mountShardOnServer(targetServer pb.ServerAddress, shardId uint32)
|
||||
glog.V(1).Infof("MOUNT SUCCESS: Shard %d successfully mounted on %s", shardId, targetServer)
|
||||
return nil
|
||||
}
|
||||
|
||||
// uploadShardsToSourceServer uploads generated EC shards back to the source volume server
|
||||
func (t *Task) uploadShardsToSourceServer(shardFiles []string) error {
|
||||
glog.V(1).Infof("Uploading %d EC shards back to source server %s", len(shardFiles), t.sourceServer)
|
||||
|
||||
// TODO: Implement actual upload mechanism
|
||||
// This would upload the locally generated shards back to the source volume server
|
||||
// so they can be distributed using the standard VolumeEcShardsCopy mechanism
|
||||
|
||||
for i, shardFile := range shardFiles {
|
||||
info, err := os.Stat(shardFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("shard file %s not found: %v", shardFile, err)
|
||||
}
|
||||
glog.V(2).Infof("Shard %d: %s (%d bytes) ready for upload", i, shardFile, info.Size())
|
||||
}
|
||||
|
||||
// Placeholder - in production this would upload each shard file
|
||||
// to the source volume server's disk location
|
||||
glog.V(1).Infof("Placeholder: would upload %d shards to source server", len(shardFiles))
|
||||
return nil
|
||||
}
|
||||
|
||||
// distributeEcShardsFromSource distributes EC shards from source server using VolumeEcShardsCopy
|
||||
func (t *Task) distributeEcShardsFromSource() error {
|
||||
glog.V(1).Infof("Distributing EC shards from source server %s using VolumeEcShardsCopy", t.sourceServer)
|
||||
|
||||
// Get available servers for distribution
|
||||
availableServers, err := t.getAvailableServers()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get available servers: %v", err)
|
||||
}
|
||||
|
||||
if len(availableServers) < 4 {
|
||||
return fmt.Errorf("insufficient servers for EC distribution: need at least 4, found %d", len(availableServers))
|
||||
}
|
||||
|
||||
// Distribute shards using round-robin to available servers
|
||||
for shardId := 0; shardId < t.totalShards; shardId++ {
|
||||
targetServer := availableServers[shardId%len(availableServers)]
|
||||
|
||||
// Skip if target is the same as source
|
||||
if targetServer.Address == t.sourceServer {
|
||||
continue
|
||||
}
|
||||
|
||||
err := t.copyAndMountSingleShard(targetServer.Address, uint32(shardId))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to copy and mount shard %d to %s: %v", shardId, targetServer.Address, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// copyAndMountSingleShard copies a single shard from source to target and mounts it
|
||||
func (t *Task) copyAndMountSingleShard(targetServer string, shardId uint32) error {
|
||||
glog.V(1).Infof("Copying and mounting shard %d from %s to %s", shardId, t.sourceServer, targetServer)
|
||||
|
||||
ctx := context.Background()
|
||||
grpcAddress := pb.ServerToGrpcAddress(targetServer)
|
||||
conn, err := grpc.NewClient(grpcAddress, t.grpcDialOpt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to %s: %v", targetServer, err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
client := volume_server_pb.NewVolumeServerClient(conn)
|
||||
|
||||
// Copy shard using VolumeEcShardsCopy
|
||||
_, err = client.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
|
||||
VolumeId: t.volumeID,
|
||||
Collection: t.collection,
|
||||
ShardIds: []uint32{shardId},
|
||||
CopyEcxFile: shardId == 0, // Only copy .ecx file with first shard
|
||||
CopyEcjFile: true,
|
||||
CopyVifFile: shardId == 0, // Only copy .vif file with first shard
|
||||
SourceDataNode: t.sourceServer,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to copy shard %d: %v", shardId, err)
|
||||
}
|
||||
|
||||
// Mount shard
|
||||
_, err = client.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
|
||||
VolumeId: t.volumeID,
|
||||
Collection: t.collection,
|
||||
ShardIds: []uint32{shardId},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to mount shard %d: %v", shardId, err)
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Successfully copied and mounted shard %d on %s", shardId, targetServer)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -25,14 +25,14 @@ var (
|
||||
_ types.PolicyConfigurableDetector = (*EcDetector)(nil)
|
||||
)
|
||||
|
||||
// NewEcDetector creates a new erasure coding detector with configurable defaults
|
||||
// NewEcDetector creates a new erasure coding detector with production defaults
|
||||
func NewEcDetector() *EcDetector {
|
||||
return &EcDetector{
|
||||
enabled: true, // Enabled for testing
|
||||
quietForSeconds: 0, // No quiet requirement for testing (was 24)
|
||||
fullnessRatio: 0.90, // 90% full by default
|
||||
minSizeMB: 50, // Minimum 50MB for testing (was 100MB)
|
||||
scanInterval: 30 * time.Second, // Faster scanning for testing
|
||||
enabled: false, // Conservative default - enable via configuration
|
||||
quietForSeconds: 7 * 24 * 60 * 60, // 7 days quiet period
|
||||
fullnessRatio: 0.90, // 90% full threshold
|
||||
minSizeMB: 100, // Minimum 100MB volume size
|
||||
scanInterval: 12 * time.Hour, // Scan every 12 hours
|
||||
collectionFilter: "", // No collection filter by default
|
||||
}
|
||||
}
|
||||
@@ -155,37 +155,50 @@ func (d *EcDetector) Configure(config map[string]interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Legacy compatibility methods for existing code
|
||||
|
||||
// SetEnabled sets whether the detector is enabled
|
||||
func (d *EcDetector) SetEnabled(enabled bool) {
|
||||
d.enabled = enabled
|
||||
}
|
||||
|
||||
func (d *EcDetector) SetVolumeAgeSeconds(seconds int) {
|
||||
d.quietForSeconds = seconds
|
||||
}
|
||||
|
||||
func (d *EcDetector) SetVolumeAgeHours(hours int) {
|
||||
d.quietForSeconds = hours * 3600 // Convert hours to seconds
|
||||
}
|
||||
|
||||
// SetQuietForSeconds sets the quiet duration threshold in seconds
|
||||
func (d *EcDetector) SetQuietForSeconds(seconds int) {
|
||||
d.quietForSeconds = seconds
|
||||
}
|
||||
|
||||
// SetFullnessRatio sets the fullness ratio threshold
|
||||
func (d *EcDetector) SetFullnessRatio(ratio float64) {
|
||||
d.fullnessRatio = ratio
|
||||
}
|
||||
|
||||
// SetCollectionFilter sets the collection filter
|
||||
func (d *EcDetector) SetCollectionFilter(filter string) {
|
||||
d.collectionFilter = filter
|
||||
}
|
||||
|
||||
// SetScanInterval sets the scan interval
|
||||
func (d *EcDetector) SetScanInterval(interval time.Duration) {
|
||||
d.scanInterval = interval
|
||||
}
|
||||
|
||||
// PolicyConfigurableDetector interface implementation
|
||||
// GetQuietForSeconds returns the current quiet duration threshold in seconds
|
||||
func (d *EcDetector) GetQuietForSeconds() int {
|
||||
return d.quietForSeconds
|
||||
}
|
||||
|
||||
// GetFullnessRatio returns the current fullness ratio threshold
|
||||
func (d *EcDetector) GetFullnessRatio() float64 {
|
||||
return d.fullnessRatio
|
||||
}
|
||||
|
||||
// GetCollectionFilter returns the current collection filter
|
||||
func (d *EcDetector) GetCollectionFilter() string {
|
||||
return d.collectionFilter
|
||||
}
|
||||
|
||||
// GetScanInterval returns the scan interval
|
||||
func (d *EcDetector) GetScanInterval() time.Duration {
|
||||
return d.scanInterval
|
||||
}
|
||||
|
||||
// ConfigureFromPolicy configures the detector from maintenance policy
|
||||
func (d *EcDetector) ConfigureFromPolicy(policy interface{}) {
|
||||
@@ -211,33 +224,3 @@ func (d *EcDetector) ConfigureFromPolicy(policy interface{}) {
|
||||
glog.Warningf("ConfigureFromPolicy received unknown policy type: %T", policy)
|
||||
}
|
||||
}
|
||||
|
||||
// GetVolumeAgeSeconds returns the current volume age threshold in seconds (legacy method)
|
||||
func (d *EcDetector) GetVolumeAgeSeconds() int {
|
||||
return d.quietForSeconds
|
||||
}
|
||||
|
||||
// GetVolumeAgeHours returns the current volume age threshold in hours (legacy method)
|
||||
func (d *EcDetector) GetVolumeAgeHours() int {
|
||||
return d.quietForSeconds / 3600 // Convert seconds to hours
|
||||
}
|
||||
|
||||
// GetQuietForSeconds returns the current quiet duration threshold in seconds
|
||||
func (d *EcDetector) GetQuietForSeconds() int {
|
||||
return d.quietForSeconds
|
||||
}
|
||||
|
||||
// GetFullnessRatio returns the current fullness ratio threshold
|
||||
func (d *EcDetector) GetFullnessRatio() float64 {
|
||||
return d.fullnessRatio
|
||||
}
|
||||
|
||||
// GetCollectionFilter returns the current collection filter
|
||||
func (d *EcDetector) GetCollectionFilter() string {
|
||||
return d.collectionFilter
|
||||
}
|
||||
|
||||
// GetScanInterval returns the scan interval
|
||||
func (d *EcDetector) GetScanInterval() time.Duration {
|
||||
return d.scanInterval
|
||||
}
|
||||
|
||||
@@ -79,25 +79,6 @@ func (s *Scheduler) GetPriority(task *types.Task) types.TaskPriority {
|
||||
return types.TaskPriorityLow // EC is not urgent
|
||||
}
|
||||
|
||||
// WasTaskRecentlyCompleted checks if a similar task was recently completed
|
||||
func (s *Scheduler) WasTaskRecentlyCompleted(task *types.Task, completedTasks []*types.Task, now time.Time) bool {
|
||||
// Don't repeat EC for 24 hours
|
||||
interval := 24 * time.Hour
|
||||
cutoff := now.Add(-interval)
|
||||
|
||||
for _, completedTask := range completedTasks {
|
||||
if completedTask.Type == types.TaskTypeErasureCoding &&
|
||||
completedTask.VolumeID == task.VolumeID &&
|
||||
completedTask.Server == task.Server &&
|
||||
completedTask.Status == types.TaskStatusCompleted &&
|
||||
completedTask.CompletedAt != nil &&
|
||||
completedTask.CompletedAt.After(cutoff) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// IsEnabled returns whether this task type is enabled
|
||||
func (s *Scheduler) IsEnabled() bool {
|
||||
return s.enabled
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package erasure_coding
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
@@ -65,7 +66,7 @@ func (logic *ErasureCodingUILogic) GetCurrentConfig() interface{} {
|
||||
func (logic *ErasureCodingUILogic) ApplyConfig(config interface{}) error {
|
||||
ecConfig, ok := config.(ErasureCodingConfig)
|
||||
if !ok {
|
||||
return nil // Will be handled by base provider fallback
|
||||
return fmt.Errorf("invalid configuration type for erasure coding")
|
||||
}
|
||||
|
||||
// Apply to detector
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package vacuum
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
@@ -65,7 +66,7 @@ func (logic *VacuumUILogic) GetCurrentConfig() interface{} {
|
||||
func (logic *VacuumUILogic) ApplyConfig(config interface{}) error {
|
||||
vacuumConfig, ok := config.(*VacuumConfig)
|
||||
if !ok {
|
||||
return nil // Will be handled by base provider fallback
|
||||
return fmt.Errorf("invalid configuration type for vacuum")
|
||||
}
|
||||
|
||||
// Apply to detector
|
||||
|
||||
Reference in New Issue
Block a user