mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-09 18:32:43 +00:00
7e608c877a
* refactor(ec_balance): make the balance planner per-volume ratio-capable Thread a per-volume EC ratio through the balance planner: Plan resolves each volume's data/parity from a new Options.VolumeRatio (falling back to the collection Ratio, then the build default, when it reports 0), and keys the global phase's ratio maps by volume instead of collection. The shell and worker balance paths build the per-volume lookup from each shard's heartbeat via the new ecbalancer.VolumeShardRatio. In OSS this is behavior-preserving: VolumeShardRatio returns 0 because the per-volume data_shards/parity_shards heartbeat fields are an enterprise feature, so every volume falls back to the collection ratio -- the existing standard-scheme behavior. The refactor keeps the shared planner in sync with the enterprise fork, which overrides VolumeShardRatio to classify and spread a mixed-ratio collection by each volume's own data/parity split. * perf(ec_balance): hoist the collection ratio out of the per-volume loop The collection ratio is constant for every volume in a collection, so resolve it once per collection instead of per volume; a custom Ratio func may do map lookups or locking. Addresses a review comment.
1103 lines
37 KiB
Go
1103 lines
37 KiB
Go
package shell
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"regexp"
|
|
"slices"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/ecbalancer"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
type DataCenterId string
|
|
type EcNodeId string
|
|
type RackId string
|
|
|
|
// EcDisk represents a single disk on a volume server
|
|
type EcDisk struct {
|
|
diskId uint32
|
|
diskType string
|
|
freeEcSlots int
|
|
ecShardCount int // Total EC shards on this disk
|
|
// Map of volumeId -> ShardsInfo for shards on this disk
|
|
ecShards map[needle.VolumeId]*erasure_coding.ShardsInfo
|
|
}
|
|
|
|
type EcNode struct {
|
|
info *master_pb.DataNodeInfo
|
|
dc DataCenterId
|
|
rack RackId
|
|
freeEcSlot int
|
|
// disks maps diskId -> EcDisk for disk-level balancing
|
|
disks map[uint32]*EcDisk
|
|
}
|
|
|
|
type CandidateEcNode struct {
|
|
ecNode *EcNode
|
|
shardCount int
|
|
}
|
|
|
|
type EcRack struct {
|
|
ecNodes map[EcNodeId]*EcNode
|
|
freeEcSlot int
|
|
}
|
|
|
|
var (
|
|
ecBalanceAlgorithmDescription = `
|
|
func EcBalance() {
|
|
for each collection:
|
|
balanceEcVolumes(collectionName)
|
|
for each rack:
|
|
balanceEcRack(rack)
|
|
}
|
|
|
|
func balanceEcVolumes(collectionName){
|
|
for each volume:
|
|
doDeduplicateEcShards(volumeId)
|
|
|
|
tracks rack~shardCount mapping
|
|
for each volume:
|
|
doBalanceEcShardsAcrossRacks(volumeId)
|
|
|
|
for each volume:
|
|
doBalanceEcShardsWithinRacks(volumeId)
|
|
}
|
|
|
|
// spread ec shards into more racks
|
|
func doBalanceEcShardsAcrossRacks(volumeId){
|
|
tracks rack~volumeIdShardCount mapping
|
|
averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc
|
|
ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
|
|
for each ecShardsToMove {
|
|
destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, ecShardReplicaPlacement)
|
|
destVolumeServers = volume servers on the destRack
|
|
pickOneEcNodeAndMoveOneShard(destVolumeServers)
|
|
}
|
|
}
|
|
|
|
func doBalanceEcShardsWithinRacks(volumeId){
|
|
racks = collect all racks that the volume id is on
|
|
for rack, shards := range racks
|
|
doBalanceEcShardsWithinOneRack(volumeId, shards, rack)
|
|
}
|
|
|
|
// move ec shards
|
|
func doBalanceEcShardsWithinOneRack(volumeId, shards, rackId){
|
|
tracks volumeServer~volumeIdShardCount mapping
|
|
averageShardCount = len(shards) / numVolumeServers
|
|
volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack
|
|
ecShardsToMove = select overflown ec shards from volumeServersOverAverage
|
|
for each ecShardsToMove {
|
|
destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, ecShardReplicaPlacement)
|
|
pickOneEcNodeAndMoveOneShard(destVolumeServers)
|
|
}
|
|
}
|
|
|
|
// move ec shards while keeping shard distribution for the same volume unchanged or more even
|
|
func balanceEcRack(rack){
|
|
averageShardCount = total shards / numVolumeServers
|
|
for hasMovedOneEcShard {
|
|
sort all volume servers ordered by the number of local ec shards
|
|
pick the volume server A with the lowest number of ec shards x
|
|
pick the volume server B with the highest number of ec shards y
|
|
if y > averageShardCount and x +1 <= averageShardCount {
|
|
if B has a ec shard with volume id v that A does not have {
|
|
move one ec shard v from B to A
|
|
hasMovedOneEcShard = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
`
|
|
// Overridable functions for testing.
|
|
getDefaultReplicaPlacement = _getDefaultReplicaPlacement
|
|
)
|
|
|
|
func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
|
|
var resp *master_pb.GetMasterConfigurationResponse
|
|
var err error
|
|
|
|
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
|
|
resp, err = client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
|
|
return err
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return super_block.NewReplicaPlacementFromString(resp.DefaultReplication)
|
|
}
|
|
|
|
func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super_block.ReplicaPlacement, error) {
|
|
var rp *super_block.ReplicaPlacement
|
|
var err error
|
|
|
|
if replicaStr != "" {
|
|
rp, err = super_block.NewReplicaPlacementFromString(replicaStr)
|
|
if err != nil {
|
|
return rp, err
|
|
}
|
|
glog.V(1).Infof("using replica placement %q for EC volumes\n", rp.String())
|
|
} else {
|
|
// No replica placement argument provided, resolve from master default settings.
|
|
rp, err = getDefaultReplicaPlacement(commandEnv)
|
|
if err != nil {
|
|
return rp, err
|
|
}
|
|
glog.V(1).Infof("using master default replica placement %q for EC volumes\n", rp.String())
|
|
}
|
|
|
|
return rp, nil
|
|
}
|
|
|
|
func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
|
|
if delayBeforeCollecting > 0 {
|
|
time.Sleep(delayBeforeCollecting)
|
|
}
|
|
|
|
var resp *master_pb.VolumeListResponse
|
|
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
|
|
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
|
|
return err
|
|
})
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
return resp.TopologyInfo, resp.VolumeSizeLimitMb, nil
|
|
}
|
|
|
|
func collectDataNodes(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) ([]*master_pb.DataNodeInfo, error) {
|
|
dataNodes := []*master_pb.DataNodeInfo{}
|
|
|
|
topo, _, err := collectTopologyInfo(commandEnv, delayBeforeCollecting)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, dci := range topo.GetDataCenterInfos() {
|
|
for _, r := range dci.GetRackInfos() {
|
|
for _, dn := range r.GetDataNodeInfos() {
|
|
dataNodes = append(dataNodes, dn)
|
|
}
|
|
}
|
|
}
|
|
|
|
return dataNodes, nil
|
|
}
|
|
|
|
func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
|
|
// list all possible locations
|
|
// collect topology information
|
|
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// find out all volume servers with one slot left.
|
|
ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter, diskType)
|
|
|
|
sortEcNodesByFreeslotsDescending(ecNodes)
|
|
|
|
return
|
|
}
|
|
|
|
func collectEcNodes(commandEnv *CommandEnv, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
|
|
return collectEcNodesForDC(commandEnv, "", diskType)
|
|
}
|
|
|
|
// assertEncodableRegularVolumes rejects volume ids that are not encodable
|
|
// regular volumes in the topology snapshot: an already-EC volume (present only
|
|
// as EC shards, with no .dat) or an id absent from the cluster. Encoding an
|
|
// already-EC volume would clear its shards before failing, destroying the only
|
|
// copy. A volume present as BOTH a regular .dat and stale orphan shards (a
|
|
// failed-encode retry) passes, so the retry + orphan sweep still works.
|
|
func assertEncodableRegularVolumes(t *master_pb.TopologyInfo, vids []needle.VolumeId) error {
|
|
want := make(map[needle.VolumeId]bool, len(vids))
|
|
for _, vid := range vids {
|
|
want[vid] = true
|
|
}
|
|
regular := make(map[needle.VolumeId]bool)
|
|
hasEcShards := make(map[needle.VolumeId]bool)
|
|
for _, dc := range t.DataCenterInfos {
|
|
for _, r := range dc.RackInfos {
|
|
for _, dn := range r.DataNodeInfos {
|
|
for _, diskInfo := range dn.DiskInfos {
|
|
if diskInfo == nil {
|
|
continue
|
|
}
|
|
for _, vi := range diskInfo.VolumeInfos {
|
|
if want[needle.VolumeId(vi.Id)] {
|
|
regular[needle.VolumeId(vi.Id)] = true
|
|
}
|
|
}
|
|
for _, ec := range diskInfo.EcShardInfos {
|
|
if want[needle.VolumeId(ec.Id)] {
|
|
hasEcShards[needle.VolumeId(ec.Id)] = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
for _, vid := range vids {
|
|
if regular[vid] {
|
|
continue
|
|
}
|
|
if hasEcShards[vid] {
|
|
return fmt.Errorf("volume %d is already EC-encoded (no .dat replica); refusing to re-encode, which would destroy its shards", vid)
|
|
}
|
|
return fmt.Errorf("volume %d not found as a regular volume in the cluster; refusing to encode", vid)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// collectVolumeIdToCollection returns a map from volume ID to its collection name
|
|
func collectVolumeIdToCollection(t *master_pb.TopologyInfo, vids []needle.VolumeId) map[needle.VolumeId]string {
|
|
result := make(map[needle.VolumeId]string)
|
|
if len(vids) == 0 {
|
|
return result
|
|
}
|
|
|
|
vidSet := make(map[needle.VolumeId]bool)
|
|
for _, vid := range vids {
|
|
vidSet[vid] = true
|
|
}
|
|
|
|
for _, dc := range t.DataCenterInfos {
|
|
for _, r := range dc.RackInfos {
|
|
for _, dn := range r.DataNodeInfos {
|
|
for _, diskInfo := range dn.DiskInfos {
|
|
for _, vi := range diskInfo.VolumeInfos {
|
|
vid := needle.VolumeId(vi.Id)
|
|
if vidSet[vid] {
|
|
result[vid] = vi.Collection
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.VolumeId) []string {
|
|
if len(vids) == 0 {
|
|
return nil
|
|
}
|
|
|
|
found := map[string]bool{}
|
|
for _, dc := range t.DataCenterInfos {
|
|
for _, r := range dc.RackInfos {
|
|
for _, dn := range r.DataNodeInfos {
|
|
for _, diskInfo := range dn.DiskInfos {
|
|
for _, vi := range diskInfo.VolumeInfos {
|
|
for _, vid := range vids {
|
|
if needle.VolumeId(vi.Id) == vid {
|
|
found[vi.Collection] = true
|
|
}
|
|
}
|
|
}
|
|
for _, ecs := range diskInfo.EcShardInfos {
|
|
for _, vid := range vids {
|
|
if needle.VolumeId(ecs.Id) == vid {
|
|
found[ecs.Collection] = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if len(found) == 0 {
|
|
return nil
|
|
}
|
|
|
|
collections := []string{}
|
|
for k, _ := range found {
|
|
collections = append(collections, k)
|
|
}
|
|
sort.Strings(collections)
|
|
return collections
|
|
}
|
|
|
|
func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool, diskType types.DiskType) (err error) {
|
|
|
|
if !commandEnv.isLocked() {
|
|
return fmt.Errorf("lock is lost")
|
|
}
|
|
|
|
copiedShardIds := []erasure_coding.ShardId{shardId}
|
|
|
|
if applyBalancing {
|
|
|
|
existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info)
|
|
|
|
// ask destination node to copy shard and the ecx file from source node, and mount it
|
|
copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []erasure_coding.ShardId{shardId}, vid, collection, existingServerAddress, destDiskId)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// unmount the to be deleted shards
|
|
err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingServerAddress, copiedShardIds)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// ask source node to delete the shard, and maybe the ecx file
|
|
err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingServerAddress, copiedShardIds)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if destDiskId > 0 {
|
|
fmt.Printf("moved ec shard %d.%d %s => %s (disk %d)\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id, destDiskId)
|
|
} else {
|
|
fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
|
|
}
|
|
|
|
}
|
|
|
|
destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds, diskType)
|
|
existingLocation.deleteEcVolumeShards(vid, copiedShardIds, diskType)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
|
|
targetServer *EcNode, shardIdsToCopy []erasure_coding.ShardId,
|
|
volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress, destDiskId uint32) (copiedShardIds []erasure_coding.ShardId, err error) {
|
|
|
|
fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
|
|
|
|
targetAddress := pb.NewServerAddressFromDataNode(targetServer.info)
|
|
err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
|
|
if targetAddress != existingLocation {
|
|
fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
|
|
_, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
|
|
VolumeId: uint32(volumeId),
|
|
Collection: collection,
|
|
ShardIds: erasure_coding.ShardIdsToUint32(shardIdsToCopy),
|
|
CopyEcxFile: true,
|
|
CopyEcjFile: true,
|
|
CopyVifFile: true,
|
|
CopyEcsumFile: true, // propagate the bitrot sidecar with the shards (no-op if the source has none)
|
|
SourceDataNode: string(existingLocation),
|
|
DiskId: destDiskId,
|
|
})
|
|
if copyErr != nil {
|
|
return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
|
|
}
|
|
}
|
|
|
|
fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
|
|
_, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
|
|
VolumeId: uint32(volumeId),
|
|
Collection: collection,
|
|
ShardIds: erasure_coding.ShardIdsToUint32(shardIdsToCopy),
|
|
})
|
|
if mountErr != nil {
|
|
return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
|
|
}
|
|
|
|
if targetAddress != existingLocation {
|
|
copiedShardIds = shardIdsToCopy
|
|
glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo)) {
|
|
for _, dc := range topo.DataCenterInfos {
|
|
for _, rack := range dc.RackInfos {
|
|
for _, dn := range rack.DataNodeInfos {
|
|
fn(DataCenterId(dc.Id), RackId(rack.Id), dn)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func sortEcNodesByFreeslotsDescending(ecNodes []*EcNode) {
|
|
slices.SortFunc(ecNodes, func(a, b *EcNode) int {
|
|
return b.freeEcSlot - a.freeEcSlot
|
|
})
|
|
}
|
|
|
|
func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
|
|
slices.SortFunc(ecNodes, func(a, b *EcNode) int {
|
|
return a.freeEcSlot - b.freeEcSlot
|
|
})
|
|
}
|
|
|
|
func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
|
|
for _, eci := range ecShardInfos {
|
|
count += erasure_coding.GetShardCount(eci)
|
|
}
|
|
return
|
|
}
|
|
|
|
func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (count int) {
|
|
if dn.DiskInfos == nil {
|
|
return 0
|
|
}
|
|
diskInfo := dn.DiskInfos[string(diskType)]
|
|
if diskInfo == nil {
|
|
return 0
|
|
}
|
|
|
|
slots := int(diskInfo.MaxVolumeCount-diskInfo.VolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos)
|
|
if slots < 0 {
|
|
return 0
|
|
}
|
|
|
|
return slots
|
|
}
|
|
|
|
func (ecNode *EcNode) localShardIdCount(vid uint32) int {
|
|
for _, diskInfo := range ecNode.info.DiskInfos {
|
|
for _, eci := range diskInfo.EcShardInfos {
|
|
if vid == eci.Id {
|
|
return erasure_coding.GetShardCount(eci)
|
|
}
|
|
}
|
|
}
|
|
return 0
|
|
}
|
|
|
|
func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int) {
|
|
eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
|
if selectedDataCenter != "" && selectedDataCenter != string(dc) {
|
|
return
|
|
}
|
|
|
|
freeEcSlots := countFreeShardSlots(dn, diskType)
|
|
ecNode := &EcNode{
|
|
info: dn,
|
|
dc: dc,
|
|
rack: rack,
|
|
freeEcSlot: int(freeEcSlots),
|
|
disks: make(map[uint32]*EcDisk),
|
|
}
|
|
|
|
// Build disk-level information from volumes and EC shards
|
|
// First, discover all unique disk IDs from VolumeInfos (includes empty disks)
|
|
allDiskIds := make(map[uint32]string) // diskId -> diskType
|
|
for diskTypeKey, diskInfo := range dn.DiskInfos {
|
|
if diskInfo == nil {
|
|
continue
|
|
}
|
|
// Get all disk IDs from volumes
|
|
for _, vi := range diskInfo.VolumeInfos {
|
|
allDiskIds[vi.DiskId] = diskTypeKey
|
|
}
|
|
// Also get disk IDs from EC shards
|
|
for _, ecShardInfo := range diskInfo.EcShardInfos {
|
|
allDiskIds[ecShardInfo.DiskId] = diskTypeKey
|
|
}
|
|
}
|
|
|
|
// Group EC shards by disk_id
|
|
diskShards := make(map[uint32]map[needle.VolumeId]*erasure_coding.ShardsInfo)
|
|
for _, diskInfo := range dn.DiskInfos {
|
|
if diskInfo == nil {
|
|
continue
|
|
}
|
|
for _, eci := range diskInfo.EcShardInfos {
|
|
diskId := eci.DiskId
|
|
if diskShards[diskId] == nil {
|
|
diskShards[diskId] = make(map[needle.VolumeId]*erasure_coding.ShardsInfo)
|
|
}
|
|
vid := needle.VolumeId(eci.Id)
|
|
diskShards[diskId][vid] = erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(eci)
|
|
}
|
|
}
|
|
|
|
// Create EcDisk for each discovered disk
|
|
diskCount := len(allDiskIds)
|
|
if diskCount == 0 {
|
|
diskCount = 1
|
|
}
|
|
freePerDisk := int(freeEcSlots) / diskCount
|
|
|
|
for diskId, diskTypeStr := range allDiskIds {
|
|
shards := diskShards[diskId]
|
|
if shards == nil {
|
|
shards = make(map[needle.VolumeId]*erasure_coding.ShardsInfo)
|
|
}
|
|
totalShardCount := 0
|
|
for _, shardsInfo := range shards {
|
|
totalShardCount += shardsInfo.Count()
|
|
}
|
|
|
|
ecNode.disks[diskId] = &EcDisk{
|
|
diskId: diskId,
|
|
diskType: diskTypeStr,
|
|
freeEcSlots: freePerDisk,
|
|
ecShardCount: totalShardCount,
|
|
ecShards: shards,
|
|
}
|
|
}
|
|
|
|
ecNodes = append(ecNodes, ecNode)
|
|
totalFreeEcSlots += freeEcSlots
|
|
})
|
|
return
|
|
}
|
|
|
|
func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeDeletedShardIds []erasure_coding.ShardId) error {
|
|
|
|
fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
|
|
|
|
return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
_, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
|
|
VolumeId: uint32(volumeId),
|
|
Collection: collection,
|
|
ShardIds: erasure_coding.ShardIdsToUint32(toBeDeletedShardIds),
|
|
})
|
|
return deleteErr
|
|
})
|
|
|
|
}
|
|
|
|
// errFullTeardownNotAcked marks a reachable server that completed the delete RPC
|
|
// but did not report full_teardown_done (a pre-upgrade volume server). The orphan
|
|
// sweep must treat this as fatal: the node may still hold an orphan that a later
|
|
// copy would re-stamp into the new generation.
|
|
var errFullTeardownNotAcked = errors.New("delete did not perform full teardown (pre-upgrade volume server?); a stale EC generation may remain")
|
|
|
|
// pingVolumeServer probes node liveness with an empty-target Ping, which is never
|
|
// maintenance-gated, and returns the raw Ping error (nil on success). It lets the
|
|
// orphan sweep disambiguate a delete codes.Unavailable: a Rust volume server in
|
|
// maintenance mode fails the maintenance-gated delete with Unavailable yet answers
|
|
// Ping, whereas a genuinely-down node fails Ping with a transport Unavailable too.
|
|
// A Go server returns Unknown for maintenance, which isNodeUnreachable already
|
|
// treats as fatal. The caller classifies the result with classifyNodeLiveness:
|
|
// only a Ping that itself transport-failed (codes.Unavailable) confirms the node
|
|
// is down; a nil error (reachable) or any other Ping error (inconclusive — e.g. a
|
|
// pre-Ping server returning Unimplemented, which means the node is up) is fatal.
|
|
func pingVolumeServer(grpcDialOption grpc.DialOption, location pb.ServerAddress) error {
|
|
return operation.WithVolumeServerClient(false, location, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
|
_, pingErr := client.Ping(context.Background(), &volume_server_pb.PingRequest{})
|
|
return pingErr
|
|
})
|
|
}
|
|
|
|
// unmountAndDeleteEcShardsQuiet unmounts then deletes shards on one server in a
|
|
// single connection, without the per-call logging the interactive helpers emit.
|
|
// Used by the orphan sweep, which fans out to every node x volume and would
|
|
// otherwise flood the shell with no-op lines.
|
|
func unmountAndDeleteEcShardsQuiet(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, location pb.ServerAddress, shardIds []erasure_coding.ShardId) error {
|
|
ids := erasure_coding.ShardIdsToUint32(shardIds)
|
|
return operation.WithVolumeServerClient(false, location, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
if _, err := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
|
|
VolumeId: uint32(volumeId),
|
|
ShardIds: ids,
|
|
}); err != nil {
|
|
return fmt.Errorf("unmount: %w", err)
|
|
}
|
|
resp, err := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
|
|
VolumeId: uint32(volumeId),
|
|
Collection: collection,
|
|
ShardIds: ids,
|
|
FullTeardown: true,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("delete: %w", err)
|
|
}
|
|
if !resp.GetFullTeardownDone() {
|
|
return fmt.Errorf("delete on %s: %w", location, errFullTeardownNotAcked)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeUnmountedShardIds []erasure_coding.ShardId) error {
|
|
|
|
fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedShardIds, sourceLocation)
|
|
|
|
return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
_, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
|
|
VolumeId: uint32(volumeId),
|
|
ShardIds: erasure_coding.ShardIdsToUint32(toBeUnmountedShardIds),
|
|
})
|
|
return deleteErr
|
|
})
|
|
}
|
|
|
|
func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeMountedShardIds []erasure_coding.ShardId) error {
|
|
|
|
fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedShardIds, sourceLocation)
|
|
|
|
return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
_, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
|
|
VolumeId: uint32(volumeId),
|
|
Collection: collection,
|
|
ShardIds: erasure_coding.ShardIdsToUint32(toBeMountedShardIds),
|
|
})
|
|
return mountErr
|
|
})
|
|
}
|
|
|
|
func ceilDivide(a, b int) int {
|
|
var r int
|
|
if (a % b) != 0 {
|
|
r = 1
|
|
}
|
|
return (a / b) + r
|
|
}
|
|
|
|
func findEcVolumeShardsInfo(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType) *erasure_coding.ShardsInfo {
|
|
if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found {
|
|
for _, shardInfo := range diskInfo.EcShardInfos {
|
|
if needle.VolumeId(shardInfo.Id) == vid {
|
|
return erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(shardInfo)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Returns an empty ShardsInfo struct on failure, to avoid potential nil dereferences.
|
|
return erasure_coding.NewShardsInfo()
|
|
}
|
|
|
|
// TODO: simplify me
|
|
func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []erasure_coding.ShardId, diskType types.DiskType) *EcNode {
|
|
|
|
foundVolume := false
|
|
diskInfo, found := ecNode.info.DiskInfos[string(diskType)]
|
|
if found {
|
|
for _, ecsi := range diskInfo.EcShardInfos {
|
|
if needle.VolumeId(ecsi.Id) == vid {
|
|
si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(ecsi)
|
|
oldShardCount := si.Count()
|
|
for _, shardId := range shardIds {
|
|
si.Set(erasure_coding.NewShardInfo(shardId, 0))
|
|
}
|
|
ecsi.EcIndexBits = si.Bitmap()
|
|
ecsi.ShardSizes = si.SizesInt64()
|
|
ecNode.freeEcSlot -= si.Count() - oldShardCount
|
|
foundVolume = true
|
|
break
|
|
}
|
|
}
|
|
} else {
|
|
diskInfo = &master_pb.DiskInfo{
|
|
Type: string(diskType),
|
|
}
|
|
ecNode.info.DiskInfos[string(diskType)] = diskInfo
|
|
}
|
|
|
|
if !foundVolume {
|
|
si := erasure_coding.NewShardsInfo()
|
|
for _, id := range shardIds {
|
|
si.Set(erasure_coding.NewShardInfo(id, 0))
|
|
}
|
|
diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
|
|
Id: uint32(vid),
|
|
Collection: collection,
|
|
EcIndexBits: si.Bitmap(),
|
|
ShardSizes: si.SizesInt64(),
|
|
DiskType: string(diskType),
|
|
})
|
|
ecNode.freeEcSlot -= si.Count()
|
|
}
|
|
|
|
return ecNode
|
|
}
|
|
|
|
func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []erasure_coding.ShardId, diskType types.DiskType) *EcNode {
|
|
|
|
if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found {
|
|
for _, eci := range diskInfo.EcShardInfos {
|
|
if needle.VolumeId(eci.Id) == vid {
|
|
si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(eci)
|
|
oldCount := si.Count()
|
|
for _, shardId := range shardIds {
|
|
si.Delete(shardId)
|
|
}
|
|
eci.EcIndexBits = si.Bitmap()
|
|
eci.ShardSizes = si.SizesInt64()
|
|
ecNode.freeEcSlot -= si.Count() - oldCount
|
|
}
|
|
}
|
|
}
|
|
|
|
return ecNode
|
|
}
|
|
|
|
// pickBestDiskOnNode selects the best disk on a node for placing a new EC shard
|
|
// It prefers disks of the specified type with fewer shards and more free slots
|
|
// When shardId is provided and dataShardCount > 0, it applies anti-affinity:
|
|
// - For data shards (shardId < dataShardCount): prefer disks without parity shards
|
|
// - For parity shards (shardId >= dataShardCount): prefer disks without data shards
|
|
// If strictDiskType is false, it will fall back to other disk types if no matching disk is found
|
|
func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType, strictDiskType bool, shardId erasure_coding.ShardId, dataShardCount int) uint32 {
|
|
if len(ecNode.disks) == 0 {
|
|
return 0 // No disk info available, let the server decide
|
|
}
|
|
|
|
var bestDiskId uint32
|
|
bestScore := -1
|
|
var fallbackDiskId uint32
|
|
fallbackScore := -1
|
|
|
|
// Determine if we're placing a data or parity shard
|
|
isDataShard := dataShardCount > 0 && int(shardId) < dataShardCount
|
|
|
|
for diskId, disk := range ecNode.disks {
|
|
if disk.freeEcSlots <= 0 {
|
|
continue
|
|
}
|
|
|
|
// Check existing shards on this disk for this volume
|
|
existingShards := 0
|
|
hasDataShards := false
|
|
hasParityShards := false
|
|
if si, ok := disk.ecShards[vid]; ok {
|
|
existingShards = si.Count()
|
|
// Check what type of shards are on this disk
|
|
if dataShardCount > 0 {
|
|
for _, existingShardId := range si.Ids() {
|
|
if int(existingShardId) < dataShardCount {
|
|
hasDataShards = true
|
|
} else {
|
|
hasParityShards = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Score: prefer disks with fewer total shards and fewer shards of this volume
|
|
// Lower score is better
|
|
score := disk.ecShardCount*10 + existingShards*100
|
|
|
|
// Apply anti-affinity penalty if applicable
|
|
if dataShardCount > 0 {
|
|
if isDataShard && hasParityShards {
|
|
// Penalize placing data shard on disk with parity shards
|
|
score += 1000
|
|
} else if !isDataShard && hasDataShards {
|
|
// Penalize placing parity shard on disk with data shards
|
|
score += 1000
|
|
}
|
|
}
|
|
|
|
if disk.diskType == string(diskType) {
|
|
// Matching disk type - this is preferred
|
|
if bestScore == -1 || score < bestScore {
|
|
bestScore = score
|
|
bestDiskId = diskId
|
|
}
|
|
} else if !strictDiskType {
|
|
// Non-matching disk type - use as fallback if allowed
|
|
if fallbackScore == -1 || score < fallbackScore {
|
|
fallbackScore = score
|
|
fallbackDiskId = diskId
|
|
}
|
|
}
|
|
}
|
|
|
|
// Return matching disk type if found, otherwise fallback
|
|
if bestDiskId != 0 {
|
|
return bestDiskId
|
|
}
|
|
return fallbackDiskId
|
|
}
|
|
|
|
// ecBalancer drives an EC balance run: it collects the cluster's EC nodes, hands
|
|
// them to the shared ecbalancer planner, and executes the planned shard moves.
|
|
// The balancing policy lives in weed/storage/erasure_coding/ecbalancer, shared
|
|
// with the EC balance worker so the two cannot drift.
|
|
type ecBalancer struct {
|
|
commandEnv *CommandEnv
|
|
ecNodes []*EcNode
|
|
replicaPlacement *super_block.ReplicaPlacement
|
|
applyBalancing bool
|
|
maxParallelization int
|
|
diskType types.DiskType
|
|
}
|
|
|
|
// excludeNodes is a set of server addresses kept out of the balance as copy/move
|
|
// targets and sources. ec.encode passes the nodes its orphan sweep could not
|
|
// reach: such a node may still hold a stale-generation shard orphan, and pairing
|
|
// it with a new-generation shard from a balance copy would mix generations on one
|
|
// node. The standalone ec.balance command passes nil.
|
|
func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, diskType types.DiskType, maxParallelization int, applyBalancing bool, excludeNodes map[pb.ServerAddress]struct{}) (err error) {
|
|
// collect all ec nodes
|
|
allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc, diskType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Drop excluded nodes (and the slots they contribute) before planning so they
|
|
// can be neither a target nor a source for any move this balance plans.
|
|
if len(excludeNodes) > 0 {
|
|
kept := allEcNodes[:0]
|
|
var excludedFreeSlots int
|
|
for _, en := range allEcNodes {
|
|
if _, skip := excludeNodes[pb.NewServerAddressFromDataNode(en.info)]; skip {
|
|
excludedFreeSlots += en.freeEcSlot
|
|
glog.V(0).Infof("EC balance excluding node %s: skipped as unreachable by the encode orphan sweep", en.info.Id)
|
|
continue
|
|
}
|
|
kept = append(kept, en)
|
|
}
|
|
allEcNodes = kept
|
|
totalFreeEcSlots -= excludedFreeSlots
|
|
}
|
|
|
|
if totalFreeEcSlots < 1 {
|
|
return fmt.Errorf("no free ec shard slots. only %d left", totalFreeEcSlots)
|
|
}
|
|
|
|
ecb := &ecBalancer{
|
|
commandEnv: commandEnv,
|
|
ecNodes: allEcNodes,
|
|
replicaPlacement: ecReplicaPlacement,
|
|
applyBalancing: applyBalancing,
|
|
maxParallelization: maxParallelization,
|
|
diskType: diskType,
|
|
}
|
|
|
|
if len(collections) == 0 {
|
|
glog.V(1).Infof("WARNING: No collections to balance EC volumes across.\n")
|
|
}
|
|
return ecb.balance(collections)
|
|
}
|
|
|
|
// shellECRatio resolves a collection's EC data/parity counts, defaulting to the
|
|
// standard scheme. This is the shell's plug-in point for custom ratios.
|
|
func shellECRatio(_ string) (int, int) {
|
|
// Custom EC ratios are an enterprise feature; OSS uses the standard scheme.
|
|
return erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount
|
|
}
|
|
|
|
// balance plans EC shard moves with the shared planner and executes them. When
|
|
// collections is empty all collections present are balanced.
|
|
func (ecb *ecBalancer) balance(collections []string) error {
|
|
topo, volumeRatio := toBalancerTopology(ecb.ecNodes, collections, ecb.diskType)
|
|
moves := ecbalancer.Plan(topo, ecbalancer.Options{
|
|
DiskType: string(ecb.diskType),
|
|
ImbalanceThreshold: 0, // the shell balances to an even distribution
|
|
ReplicaPlacement: ecb.replicaPlacement,
|
|
Ratio: shellECRatio,
|
|
// Prefer each volume's own heartbeat-reported ratio over the collection
|
|
// default so a mixed-ratio collection is spread per volume; 0 defers to
|
|
// shellECRatio (and is the always-0 OSS case).
|
|
VolumeRatio: volumeRatio,
|
|
// Balance the global phase by fractional fullness so heterogeneous-capacity
|
|
// nodes fill proportionally (matching the worker). This is identical to raw
|
|
// shard count when capacities are uniform.
|
|
GlobalUtilizationBased: true,
|
|
})
|
|
return ecb.executeMoves(moves)
|
|
}
|
|
|
|
// toBalancerTopology builds an ecbalancer.Topology from the shell's EcNode model,
|
|
// including the shards of the requested collections (all collections when empty).
|
|
// It also returns a per-volume ratio lookup built from each shard's heartbeat
|
|
// (0,0 when unreported, e.g. always in OSS), which Plan prefers over the
|
|
// collection ratio for mixed-ratio clusters.
|
|
func toBalancerTopology(ecNodes []*EcNode, collections []string, diskType types.DiskType) (*ecbalancer.Topology, func(collection string, vid uint32) (int, int)) {
|
|
allowed := make(map[string]bool, len(collections))
|
|
for _, c := range collections {
|
|
allowed[c] = true
|
|
}
|
|
|
|
type volRatioKey struct {
|
|
collection string
|
|
vid uint32
|
|
}
|
|
volRatios := make(map[volRatioKey][2]int)
|
|
|
|
topo := ecbalancer.NewTopology()
|
|
for _, en := range ecNodes {
|
|
rackKey := string(en.dc) + ":" + string(en.rack)
|
|
node := topo.AddNode(en.info.Id, string(en.dc), rackKey, en.freeEcSlot)
|
|
// Group by physical machine (host) so shards spread across machines, not just
|
|
// nodes; the id stays the node identity used for moves.
|
|
node.SetHost(pb.NewServerAddressFromDataNode(en.info).ToHost())
|
|
for diskId, d := range en.disks {
|
|
node.AddDisk(diskId, d.diskType, d.freeEcSlots, d.ecShardCount)
|
|
}
|
|
diskInfo, found := en.info.DiskInfos[string(diskType)]
|
|
if !found {
|
|
continue
|
|
}
|
|
for _, eci := range diskInfo.EcShardInfos {
|
|
if len(allowed) > 0 && !allowed[eci.Collection] {
|
|
continue
|
|
}
|
|
node.AddShards(eci.Id, eci.Collection, eci.DiskId, erasure_coding.ShardBits(eci.EcIndexBits))
|
|
if d, p := ecbalancer.VolumeShardRatio(eci); d > 0 || p > 0 {
|
|
volRatios[volRatioKey{eci.Collection, eci.Id}] = [2]int{d, p}
|
|
}
|
|
}
|
|
}
|
|
|
|
volumeRatio := func(collection string, vid uint32) (int, int) {
|
|
r := volRatios[volRatioKey{collection, vid}]
|
|
return r[0], r[1]
|
|
}
|
|
return topo, volumeRatio
|
|
}
|
|
|
|
// executeMoves carries out the planned moves. Phases run in order (a within-rack
|
|
// move can depend on a cross-rack move's result), and the independent moves
|
|
// within a phase run with up to maxParallelization concurrency. Apply mode does
|
|
// only the RPCs; dry-run mode runs sequentially and mutates the in-memory EcNode
|
|
// model so callers/tests can inspect the planned end state.
|
|
func (ecb *ecBalancer) executeMoves(moves []ecbalancer.Move) error {
|
|
byID := make(map[string]*EcNode, len(ecb.ecNodes))
|
|
for _, en := range ecb.ecNodes {
|
|
byID[en.info.Id] = en
|
|
}
|
|
|
|
// Plan emits moves grouped by phase; run each contiguous same-phase group
|
|
// together, waiting before the next so cross-phase dependencies hold.
|
|
for i := 0; i < len(moves); {
|
|
j := i
|
|
for j < len(moves) && moves[j].Phase == moves[i].Phase {
|
|
j++
|
|
}
|
|
if err := ecb.executePhase(byID, moves[i:j]); err != nil {
|
|
return err
|
|
}
|
|
i = j
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (ecb *ecBalancer) executePhase(byID map[string]*EcNode, moves []ecbalancer.Move) error {
|
|
if !ecb.applyBalancing {
|
|
// Dry-run: sequential so the in-memory model updates are race-free and
|
|
// reflect the full plan for inspection.
|
|
for _, m := range moves {
|
|
if err := ecb.executeMove(byID, m); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
// Apply mode: parallelize across volumes, but run one volume's moves within a
|
|
// phase sequentially. Concurrent moves of the same volume to a node can race
|
|
// on its shared .ecx/.ecj/.vif sidecar files.
|
|
var order []uint32
|
|
byVol := make(map[uint32][]ecbalancer.Move)
|
|
for _, m := range moves {
|
|
if _, ok := byVol[m.VolumeID]; !ok {
|
|
order = append(order, m.VolumeID)
|
|
}
|
|
byVol[m.VolumeID] = append(byVol[m.VolumeID], m)
|
|
}
|
|
ewg := NewErrorWaitGroup(ecb.maxParallelization)
|
|
for _, vid := range order {
|
|
group := byVol[vid]
|
|
ewg.Add(func() error {
|
|
for _, m := range group {
|
|
if err := ecb.executeMove(byID, m); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
return ewg.Wait()
|
|
}
|
|
|
|
func (ecb *ecBalancer) executeMove(byID map[string]*EcNode, m ecbalancer.Move) error {
|
|
src := byID[m.SourceNode]
|
|
if src == nil {
|
|
return nil
|
|
}
|
|
vid := needle.VolumeId(m.VolumeID)
|
|
shardId := erasure_coding.ShardId(m.ShardID)
|
|
shardIds := []erasure_coding.ShardId{shardId}
|
|
|
|
if m.Phase == "dedup" {
|
|
fmt.Printf("dedup: delete ec shard %d.%d on %s\n", vid, shardId, m.SourceNode)
|
|
if !ecb.applyBalancing {
|
|
src.deleteEcVolumeShards(vid, shardIds, ecb.diskType)
|
|
return nil
|
|
}
|
|
grpcDialOption := ecb.commandEnv.option.GrpcDialOption
|
|
addr := pb.NewServerAddressFromDataNode(src.info)
|
|
if err := unmountEcShards(grpcDialOption, vid, addr, shardIds); err != nil {
|
|
return err
|
|
}
|
|
return sourceServerDeleteEcShards(grpcDialOption, m.Collection, vid, addr, shardIds)
|
|
}
|
|
|
|
dst := byID[m.TargetNode]
|
|
if dst == nil {
|
|
return nil
|
|
}
|
|
if m.TargetDisk > 0 {
|
|
fmt.Printf("%s moves ec shard %d.%d to %s (disk %d)\n", m.SourceNode, vid, shardId, m.TargetNode, m.TargetDisk)
|
|
} else {
|
|
fmt.Printf("%s moves ec shard %d.%d to %s\n", m.SourceNode, vid, shardId, m.TargetNode)
|
|
}
|
|
if !ecb.applyBalancing {
|
|
// Dry-run: update the in-memory model only.
|
|
return moveMountedShardToEcNode(ecb.commandEnv, src, m.Collection, vid, shardId, dst, m.TargetDisk, false, ecb.diskType)
|
|
}
|
|
return ecb.applyShardMoveRPC(src, dst, m.Collection, vid, shardId, m.TargetDisk)
|
|
}
|
|
|
|
// applyShardMoveRPC copies a shard to the destination disk, then unmounts and
|
|
// deletes it on the source. It does not touch the in-memory model, so it is safe
|
|
// to run concurrently across the moves of a phase.
|
|
func (ecb *ecBalancer) applyShardMoveRPC(src, dst *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destDiskId uint32) error {
|
|
grpcDialOption := ecb.commandEnv.option.GrpcDialOption
|
|
srcAddr := pb.NewServerAddressFromDataNode(src.info)
|
|
copiedShardIds, err := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, dst, []erasure_coding.ShardId{shardId}, vid, collection, srcAddr, destDiskId)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(copiedShardIds) == 0 {
|
|
return nil
|
|
}
|
|
if err := unmountEcShards(grpcDialOption, vid, srcAddr, copiedShardIds); err != nil {
|
|
return err
|
|
}
|
|
return sourceServerDeleteEcShards(grpcDialOption, collection, vid, srcAddr, copiedShardIds)
|
|
}
|
|
|
|
// compileCollectionPattern compiles a regex pattern for collection matching.
|
|
// Empty patterns match empty collections only.
|
|
// The special keyword CollectionDefault ("_default") matches empty collections.
|
|
func compileCollectionPattern(pattern string) (*regexp.Regexp, error) {
|
|
if pattern == "" {
|
|
// empty pattern matches empty collection
|
|
return regexp.Compile("^$")
|
|
}
|
|
if pattern == CollectionDefault {
|
|
// CollectionDefault keyword matches empty collection
|
|
return regexp.Compile("^$")
|
|
}
|
|
return regexp.Compile(pattern)
|
|
}
|