diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 935348602..4f1069bbb 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -4,6 +4,8 @@ import ( "flag" "fmt" "io" + + "github.com/seaweedfs/seaweedfs/weed/storage/types" ) func init() { @@ -67,5 +69,5 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W return err } - return EcBalance(commandEnv, collections, *dc, rp, *maxParallelization, *applyBalancing) + return EcBalance(commandEnv, collections, *dc, rp, types.HardDriveType, *maxParallelization, *applyBalancing) } diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index f2cc581da..06284edf8 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -182,7 +182,7 @@ func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Dura } -func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { +func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { // list all possible locations // collect topology information topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) @@ -191,15 +191,15 @@ func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecN } // find out all volume servers with one slot left. - ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter) + ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter, diskType) sortEcNodesByFreeslotsDescending(ecNodes) return } -func collectEcNodes(commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { - return collectEcNodesForDC(commandEnv, "") +func collectEcNodes(commandEnv *CommandEnv, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { + return collectEcNodesForDC(commandEnv, "", diskType) } func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.VolumeId) []string { @@ -421,13 +421,13 @@ func (ecNode *EcNode) localShardIdCount(vid uint32) int { return 0 } -func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) { +func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int) { eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { if selectedDataCenter != "" && selectedDataCenter != string(dc) { return } - freeEcSlots := countFreeShardSlots(dn, types.HardDriveType) + freeEcSlots := countFreeShardSlots(dn, diskType) ecNode := &EcNode{ info: dn, dc: dc, @@ -649,6 +649,7 @@ type ecBalancer struct { replicaPlacement *super_block.ReplicaPlacement applyBalancing bool maxParallelization int + diskType types.DiskType // target disk type for EC shards (default: HardDriveType) } func (ecb *ecBalancer) errorWaitGroup() *ErrorWaitGroup { @@ -1194,9 +1195,9 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo return vidLocations } -func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, maxParallelization int, applyBalancing bool) (err error) { +func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, diskType types.DiskType, maxParallelization int, applyBalancing bool) (err error) { // collect all ec nodes - allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc) + allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc, diskType) if err != nil { return err } @@ -1210,6 +1211,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic replicaPlacement: ecReplicaPlacement, applyBalancing: applyBalancing, maxParallelization: maxParallelization, + diskType: diskType, } if len(collections) == 0 { diff --git a/weed/shell/command_ec_common_test.go b/weed/shell/command_ec_common_test.go index f1f460bc6..eac3d7860 100644 --- a/weed/shell/command_ec_common_test.go +++ b/weed/shell/command_ec_common_test.go @@ -106,7 +106,7 @@ func TestParseReplicaPlacementArg(t *testing.T) { func TestEcDistribution(t *testing.T) { // find out all volume servers with one slot left. - ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology1, "") + ecNodes, totalFreeEcSlots := collectEcVolumeServersByDc(topology1, "", types.HardDriveType) sortEcNodesByFreeslotsDescending(ecNodes) @@ -149,7 +149,7 @@ func TestPickRackToBalanceShardsInto(t *testing.T) { for _, tc := range testCases { vid, _ := needle.NewVolumeId(tc.vid) - ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "") + ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "", types.HardDriveType) rp, _ := super_block.NewReplicaPlacementFromString(tc.replicaPlacement) ecb := &ecBalancer{ @@ -225,7 +225,7 @@ func TestPickEcNodeToBalanceShardsInto(t *testing.T) { for _, tc := range testCases { vid, _ := needle.NewVolumeId(tc.vid) - allEcNodes, _ := collectEcVolumeServersByDc(tc.topology, "") + allEcNodes, _ := collectEcVolumeServersByDc(tc.topology, "", types.HardDriveType) ecb := &ecBalancer{ ecNodes: allEcNodes, diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 355869767..86dfab6bc 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -142,7 +142,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return fmt.Errorf("ec encode for volumes %v: %w", volumeIds, err) } // ...re-balance ec shards... - if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil { + if err := EcBalance(commandEnv, balanceCollections, "", rp, types.HardDriveType, *maxParallelization, *applyBalancing); err != nil { return fmt.Errorf("re-balance ec shards for collection(s) %v: %w", balanceCollections, err) } // ...then delete original volumes using pre-collected locations. diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index 79acebff1..698705853 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -12,6 +12,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/types" ) func init() { @@ -96,7 +97,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W } // collect all ec nodes - allEcNodes, _, err := collectEcNodes(commandEnv) + allEcNodes, _, err := collectEcNodes(commandEnv, types.HardDriveType) if err != nil { return err } diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index fce88d2c4..bb077c918 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -156,21 +156,29 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE } func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { - // find this ec volume server + // Evacuate EC volumes for all disk types + // We need to handle each disk type separately because shards should be moved to nodes with the same disk type // We collect topology once at the start and track capacity changes ourselves // (via freeEcSlot decrement after each move) rather than repeatedly refreshing, // which would give a false sense of correctness since topology could be stale. - ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "") + diskTypes := []types.DiskType{types.HardDriveType, types.SsdType} + + for _, diskType := range diskTypes { + ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "", diskType) thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer) if len(thisNodes) == 0 { - return fmt.Errorf("%s is not found in this cluster\n", volumeServer) + // This server doesn't have EC shards for this disk type, skip + continue } - // move away ec volumes + // move away ec volumes for this disk type for _, thisNode := range thisNodes { - for _, diskInfo := range thisNode.info.DiskInfos { + diskInfo, found := thisNode.info.DiskInfos[string(diskType)] + if !found { + continue + } for _, ecShardInfo := range diskInfo.EcShardInfos { - hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange, writer) + hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange, diskType, writer) if err != nil { fmt.Fprintf(writer, "move away volume %d from %s: %v\n", ecShardInfo.Id, volumeServer, err) } @@ -187,7 +195,7 @@ func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, return nil } -func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool, writer io.Writer) (hasMoved bool, err error) { +func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool, diskType types.DiskType, writer io.Writer) (hasMoved bool, err error) { for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() { // Sort by: 1) fewest shards of this volume, 2) most free EC slots @@ -217,13 +225,14 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv collectionPrefix = ecShardInfo.Collection + "_" } vid := needle.VolumeId(ecShardInfo.Id) - destDiskId := pickBestDiskOnNode(emptyNode, vid) + // For evacuation, prefer same disk type but allow fallback to other types + destDiskId := pickBestDiskOnNode(emptyNode, vid, diskType, false) if destDiskId > 0 { fmt.Fprintf(writer, "moving ec volume %s%d.%d %s => %s (disk %d)\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id, destDiskId) } else { fmt.Fprintf(writer, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id) } - err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange) + err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange, diskType) if err != nil { hasMoved = false return