mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 13:51:33 +00:00
* volume.tier.move: fulfill target replication before deleting old replicas When -toReplication is specified, volume.tier.move now creates all required replicas on the destination tier before deleting old replicas. This closes the data-loss window where only one copy existed on the target tier while awaiting volume.fix.replication. If replication fulfillment fails, old replicas are preserved and marked writable so the volume remains accessible. Also extracts replicateVolumeToServer and configureVolumeReplication helpers to reduce duplication across volume.tier.move and volume.fix.replication. Fixes #8937 * volume.tier.move: always fulfill replication before deleting old replicas When -toReplication is specified, use that replication setting. Otherwise, read the volume's existing replication from the super block. In both cases, all required replicas are created on the destination tier before old replicas are deleted. If replication fulfillment fails (e.g. not enough destination nodes), old replicas are preserved and marked writable so no data is lost. * volume.tier.move: address review feedback on ensureReplicationFulfilled - Add 5s delay before re-collecting topology to allow master heartbeat propagation after the move - Add nil guard for targetTierReplicas to prevent panic if the moved replica is not yet visible in the topology - Treat configureVolumeReplication failure as a hard error instead of a warning, so the rollback logic preserves old replicas * volume.tier.move: harden replication config error handling - Make configureVolumeReplication failure on the primary moved replica a hard error that aborts the move, instead of logging and continuing - Configure replication metadata on all existing target-tier replicas (not just newly created ones) when -toReplication is specified - Deletion of old replicas cannot affect new replicas since the locations list only contains pre-move servers (verified, no change) * volume.tier.move: fix cleanup deleting fulfilled replicas and broken recovery Fix 1: The cleanup loop now preserves pre-existing target-tier replicas that ensureReplicationFulfilled counted toward the replication target. Previously, a mixed-tier volume with an existing replica on the target tier could have that replica deleted right after being counted as fulfilled, leaving the volume under-replicated. ensureReplicationFulfilled now returns a preserveServers set that the deletion loop checks before removing any old replica. Fix 2: Failure paths after LiveMoveVolume (which deletes the source replica) now use restoreSurvivingReplicasWritable instead of markVolumeReplicasWritable. The old helper stopped on first error, so attempting to mark the already-deleted source writable would prevent all surviving replicas from being restored. The new helper skips the deleted source and continues through all remaining locations, logging per-replica errors instead of aborting. * volume.tier.move: mark preserved replicas writable, skip nodes with existing volume Fix 1: Preserved pre-existing target-tier replicas were left read-only after the move completed. They were marked read-only at the start (along with all other replicas) but never restored since the old code deleted them. Now they are explicitly marked writable before cleanup. Fix 2: The fulfillment loop could pick a candidate node that already hosts this volume on a different disk type, causing a VolumeCopy conflict. Added a guard that skips any node already hosting the volume (on any disk) before attempting replication.
684 lines
22 KiB
Go
684 lines
22 KiB
Go
package shell
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"slices"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
|
)
|
|
|
|
func init() {
|
|
Commands = append(Commands, &commandVolumeFixReplication{})
|
|
}
|
|
|
|
type commandVolumeFixReplication struct {
|
|
collectionPattern *string
|
|
// TODO: move parameter flags here so we don't shuffle them around via function calls.
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) Name() string {
|
|
return "volume.fix.replication"
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) Help() string {
|
|
return `add or remove replicas to volumes that are missing replicas or over-replicated
|
|
|
|
This command finds all over-replicated volumes. If found, it will purge the oldest copies and stop.
|
|
|
|
This command also finds all under-replicated volumes, and finds volume servers with free slots.
|
|
If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
|
|
|
|
volume.fix.replication # do not take action
|
|
volume.fix.replication -apply # actually deleting or copying the volume files and mount the volume
|
|
volume.fix.replication -collectionPattern=important* # fix any collections with prefix "important"
|
|
|
|
Note:
|
|
* each time this will only add back one replica for each volume id that is under replicated.
|
|
If there are multiple replicas are missing, e.g. replica count is > 2, you may need to run this multiple times.
|
|
* do not run this too quickly within seconds, since the new volume replica may take a few seconds
|
|
to register itself to the master.
|
|
|
|
`
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) HasTag(tag CommandTag) bool {
|
|
return false && tag == ResourceHeavy // resource intensive only when deleting and checking with replicas.
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
|
|
|
volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
|
c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
|
|
applyChanges := volFixReplicationCommand.Bool("apply", false, "apply the fix")
|
|
// TODO: remove this alias
|
|
applyChangesAlias := volFixReplicationCommand.Bool("force", false, "apply the fix (alias for -apply)")
|
|
verbose := volFixReplicationCommand.Bool("verbose", false, "show volumes being checked and their statuses")
|
|
doDelete := volFixReplicationCommand.Bool("doDelete", true, "Also delete over-replicated volumes besides fixing under-replication")
|
|
doCheck := volFixReplicationCommand.Bool("doCheck", true, "Also check synchronization before deleting")
|
|
maxParallelization := volFixReplicationCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
|
|
retryCount := volFixReplicationCommand.Int("retry", 5, "how many times to retry")
|
|
volumesPerStep := volFixReplicationCommand.Int("volumesPerStep", 0, "how many volumes to fix in one cycle")
|
|
|
|
if err = volFixReplicationCommand.Parse(args); err != nil {
|
|
return nil
|
|
}
|
|
|
|
handleDeprecatedForceFlag(writer, volFixReplicationCommand, applyChangesAlias, applyChanges)
|
|
infoAboutSimulationMode(writer, *applyChanges, "-apply")
|
|
commandEnv.noLock = !*applyChanges
|
|
|
|
if err = commandEnv.confirmIsLocked(args); *applyChanges && err != nil {
|
|
return
|
|
}
|
|
|
|
ewg := NewErrorWaitGroup(*maxParallelization)
|
|
underReplicatedVolumeIdsCount := 1
|
|
for underReplicatedVolumeIdsCount > 0 {
|
|
fixedVolumeReplicas := map[string]int{}
|
|
|
|
// collect topology information
|
|
if *verbose {
|
|
fmt.Fprintf(writer, "wait 15 seconds and then collect topology information...\n")
|
|
}
|
|
topologyInfo, _, err := collectTopologyInfo(commandEnv, 15*time.Second)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// find all volumes that needs replication
|
|
// collect all data nodes
|
|
volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
|
|
|
|
if *verbose {
|
|
fmt.Fprintf(writer, "collected topology: %d locations, %d volumes to check\n", len(allLocations), len(volumeReplicas))
|
|
}
|
|
|
|
if len(allLocations) == 0 {
|
|
return fmt.Errorf("no data nodes at all")
|
|
}
|
|
|
|
// find all under replicated volumes
|
|
var underReplicatedVolumeIds, overReplicatedVolumeIds, misplacedVolumeIds []uint32
|
|
for vid, replicas := range volumeReplicas {
|
|
replica := replicas[0]
|
|
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
|
|
|
|
// build locations list for optional verbose output
|
|
locations := make([]string, 0, len(replicas))
|
|
for _, r := range replicas {
|
|
locations = append(locations, r.location.String())
|
|
}
|
|
|
|
if *verbose {
|
|
fmt.Fprintf(writer, "checking volume %d replication %s has %d replicas [%s]\n", replica.info.Id, replicaPlacement, len(replicas), strings.Join(locations, ", "))
|
|
}
|
|
|
|
switch {
|
|
case replicaPlacement.GetCopyCount() > len(replicas) || !satisfyReplicaCurrentLocation(replicaPlacement, replicas):
|
|
underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
|
|
fmt.Fprintf(writer, "volume %d replication %s, but under replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
|
|
case isMisplaced(replicas, replicaPlacement):
|
|
misplacedVolumeIds = append(misplacedVolumeIds, vid)
|
|
fmt.Fprintf(writer, "volume %d replication %s is not well placed [%s]\n", replica.info.Id, replicaPlacement, strings.Join(locations, ", "))
|
|
case replicaPlacement.GetCopyCount() < len(replicas):
|
|
overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
|
|
fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
|
|
}
|
|
}
|
|
underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds)
|
|
|
|
if !commandEnv.isLocked() {
|
|
return fmt.Errorf("lock is lost")
|
|
}
|
|
|
|
ewg.Reset()
|
|
ewg.Add(func() error {
|
|
// find the most underpopulated data nodes
|
|
fixedVolumeReplicas, err = c.fixUnderReplicatedVolumes(commandEnv, writer, *applyChanges, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep)
|
|
return err
|
|
})
|
|
if *doDelete {
|
|
ewg.Add(func() error {
|
|
return c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, overReplicatedVolumeIds, volumeReplicas, allLocations, pickOneReplicaToDelete)
|
|
})
|
|
ewg.Add(func() error {
|
|
return c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, misplacedVolumeIds, volumeReplicas, allLocations, pickOneMisplacedVolume)
|
|
})
|
|
}
|
|
if err := ewg.Wait(); err != nil {
|
|
return nil
|
|
}
|
|
|
|
if !*applyChanges {
|
|
break
|
|
}
|
|
|
|
// check that the topology has been updated
|
|
if len(fixedVolumeReplicas) > 0 {
|
|
fixedVolumes := make([]string, 0, len(fixedVolumeReplicas))
|
|
for k, _ := range fixedVolumeReplicas {
|
|
fixedVolumes = append(fixedVolumes, k)
|
|
}
|
|
volumeIdLocations, err := lookupVolumeIds(commandEnv, fixedVolumes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, volumeIdLocation := range volumeIdLocations {
|
|
volumeId := volumeIdLocation.VolumeOrFileId
|
|
volumeIdLocationCount := len(volumeIdLocation.Locations)
|
|
i := 0
|
|
for fixedVolumeReplicas[volumeId] >= volumeIdLocationCount {
|
|
fmt.Fprintf(writer, "the number of locations for volume %s has not increased yet, let's wait\n", volumeId)
|
|
time.Sleep(time.Duration(i+1) * time.Second * 7)
|
|
volumeLocIds, err := lookupVolumeIds(commandEnv, []string{volumeId})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
volumeIdLocationCount = len(volumeLocIds[0].Locations)
|
|
if *retryCount <= i {
|
|
return fmt.Errorf("replicas volume %s mismatch in topology", volumeId)
|
|
}
|
|
i += 1
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) {
|
|
volumeReplicas := make(map[uint32][]*VolumeReplica)
|
|
var allLocations []location
|
|
eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
|
loc := newLocation(string(dc), string(rack), dn)
|
|
for _, diskInfo := range dn.DiskInfos {
|
|
for _, v := range diskInfo.VolumeInfos {
|
|
volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
|
|
location: &loc,
|
|
info: v,
|
|
})
|
|
}
|
|
}
|
|
allLocations = append(allLocations, loc)
|
|
})
|
|
return volumeReplicas, allLocations
|
|
}
|
|
|
|
type SelectOneVolumeFunc func(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica
|
|
|
|
func checkOneVolume(a *VolumeReplica, b *VolumeReplica, writer io.Writer, commandEnv *CommandEnv) (err error) {
|
|
aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb()
|
|
defer func() {
|
|
aDB.Close()
|
|
bDB.Close()
|
|
}()
|
|
|
|
vcd := &volumeCheckDisk{
|
|
writer: writer,
|
|
commandEnv: commandEnv,
|
|
now: time.Now(),
|
|
|
|
verbose: false,
|
|
applyChanges: true,
|
|
syncDeletions: false,
|
|
nonRepairThreshold: float64(1),
|
|
}
|
|
|
|
// read index db
|
|
if err = vcd.readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode)); err != nil {
|
|
return fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err)
|
|
}
|
|
if err := vcd.readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode)); err != nil {
|
|
return fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err)
|
|
}
|
|
if _, err = vcd.doVolumeCheckDisk(aDB, bDB, a, b); err != nil {
|
|
return fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, doCheck bool, volumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, selectOneVolumeFn SelectOneVolumeFunc) error {
|
|
if len(volumeIds) == 0 {
|
|
// nothing to do
|
|
return nil
|
|
}
|
|
|
|
for _, vid := range volumeIds {
|
|
replicas := volumeReplicas[vid]
|
|
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement))
|
|
|
|
replica := selectOneVolumeFn(replicas, replicaPlacement)
|
|
|
|
// check collection name pattern
|
|
if *c.collectionPattern != "" {
|
|
var matched bool
|
|
if *c.collectionPattern == CollectionDefault {
|
|
matched = replica.info.Collection == ""
|
|
} else {
|
|
var err error
|
|
matched, err = filepath.Match(*c.collectionPattern, replica.info.Collection)
|
|
if err != nil {
|
|
return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
|
|
}
|
|
}
|
|
if !matched {
|
|
continue
|
|
}
|
|
}
|
|
|
|
collectionIsMismatch := false
|
|
for _, volumeReplica := range replicas {
|
|
if volumeReplica.info.Collection != replica.info.Collection {
|
|
fmt.Fprintf(writer, "skip delete volume %d as collection %s is mismatch: %s\n", replica.info.Id, replica.info.Collection, volumeReplica.info.Collection)
|
|
collectionIsMismatch = true
|
|
}
|
|
}
|
|
if collectionIsMismatch {
|
|
continue
|
|
}
|
|
|
|
fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id)
|
|
|
|
if !applyChanges {
|
|
break
|
|
}
|
|
|
|
if doCheck {
|
|
var checkErr error
|
|
for _, replicaB := range replicas {
|
|
if replicaB.location.dataNode == replica.location.dataNode {
|
|
continue
|
|
}
|
|
if checkErr = checkOneVolume(replica, replicaB, writer, commandEnv); checkErr != nil {
|
|
fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", replica.info.Id, replica.location.dataNode.Id, replicaB.location.dataNode.Id, checkErr)
|
|
break
|
|
}
|
|
}
|
|
if checkErr != nil {
|
|
continue
|
|
}
|
|
}
|
|
|
|
if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id),
|
|
pb.NewServerAddressFromDataNode(replica.location.dataNode), false); err != nil {
|
|
fmt.Fprintf(writer, "deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
|
|
}
|
|
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, volumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (fixedVolumes map[string]int, err error) {
|
|
fixedVolumes = map[string]int{}
|
|
|
|
if len(volumeIds) == 0 {
|
|
return fixedVolumes, nil
|
|
}
|
|
|
|
if len(volumeIds) > volumesPerStep && volumesPerStep > 0 {
|
|
volumeIds = volumeIds[0:volumesPerStep]
|
|
}
|
|
for _, vid := range volumeIds {
|
|
for i := 0; i < retryCount+1; i++ {
|
|
var copied bool
|
|
if copied, err = c.fixOneUnderReplicatedVolume(commandEnv, writer, applyChanges, volumeReplicas, vid, allLocations); err == nil {
|
|
if applyChanges && copied {
|
|
fixedVolumes[strconv.FormatUint(uint64(vid), 10)] = len(volumeReplicas[vid])
|
|
}
|
|
break
|
|
} else {
|
|
fmt.Fprintf(writer, "fixing under replicated volume %d: %v\n", vid, err)
|
|
}
|
|
}
|
|
}
|
|
return fixedVolumes, nil
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) (bool, error) {
|
|
replicas := volumeReplicas[vid]
|
|
replica := pickOneReplicaToCopyFrom(replicas)
|
|
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
|
|
foundNewLocation := false
|
|
hasSkippedCollection := false
|
|
keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType))
|
|
fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType))
|
|
for _, dst := range allLocations {
|
|
// check whether data nodes satisfy the constraints
|
|
if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
|
|
// check collection name pattern
|
|
if *c.collectionPattern != "" {
|
|
var matched bool
|
|
if *c.collectionPattern == CollectionDefault {
|
|
matched = replica.info.Collection == ""
|
|
} else {
|
|
var err error
|
|
matched, err = filepath.Match(*c.collectionPattern, replica.info.Collection)
|
|
if err != nil {
|
|
return false, fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
|
|
}
|
|
}
|
|
if !matched {
|
|
hasSkippedCollection = true
|
|
break
|
|
}
|
|
}
|
|
|
|
// ask the volume server to replicate the volume
|
|
foundNewLocation = true
|
|
fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
|
|
|
|
if !applyChanges {
|
|
// adjust volume count
|
|
addVolumeCount(dst.dataNode.DiskInfos[replica.info.DiskType], 1)
|
|
return true, nil
|
|
}
|
|
|
|
err := replicateVolumeToServer(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(replica.info.Id),
|
|
pb.NewServerAddressFromDataNode(replica.location.dataNode),
|
|
pb.NewServerAddressFromDataNode(dst.dataNode),
|
|
replica.info.DiskType)
|
|
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// adjust volume count
|
|
addVolumeCount(dst.dataNode.DiskInfos[replica.info.DiskType], 1)
|
|
return true, nil
|
|
}
|
|
}
|
|
|
|
if !foundNewLocation && !hasSkippedCollection {
|
|
fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func addVolumeCount(info *master_pb.DiskInfo, count int) {
|
|
if info == nil {
|
|
return
|
|
}
|
|
info.VolumeCount += int64(count)
|
|
info.FreeVolumeCount -= int64(count)
|
|
}
|
|
|
|
func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) {
|
|
fn := capacityByFreeVolumeCount(diskType)
|
|
slices.SortFunc(dataNodes, func(a, b location) int {
|
|
return int(fn(b.dataNode) - fn(a.dataNode))
|
|
})
|
|
}
|
|
|
|
func satisfyReplicaCurrentLocation(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica) bool {
|
|
existingDataCenters, existingRacks, _ := countReplicas(replicas)
|
|
|
|
if replicaPlacement.DiffDataCenterCount+1 > len(existingDataCenters) {
|
|
return false
|
|
}
|
|
if replicaPlacement.DiffRackCount+1 > len(existingRacks) {
|
|
return false
|
|
}
|
|
if replicaPlacement.SameRackCount > 0 {
|
|
foundSatisfyRack := false
|
|
for _, rackCount := range existingRacks {
|
|
if rackCount >= replicaPlacement.SameRackCount+1 {
|
|
foundSatisfyRack = true
|
|
}
|
|
}
|
|
return foundSatisfyRack
|
|
}
|
|
return true
|
|
}
|
|
|
|
/*
|
|
if on an existing data node {
|
|
return false
|
|
}
|
|
|
|
if different from existing dcs {
|
|
if lack on different dcs {
|
|
return true
|
|
}else{
|
|
return false
|
|
}
|
|
}
|
|
|
|
if not on primary dc {
|
|
return false
|
|
}
|
|
|
|
if different from existing racks {
|
|
if lack on different racks {
|
|
return true
|
|
}else{
|
|
return false
|
|
}
|
|
}
|
|
|
|
if not on primary rack {
|
|
return false
|
|
}
|
|
|
|
if lacks on same rack {
|
|
return true
|
|
} else {
|
|
|
|
return false
|
|
}
|
|
*/
|
|
func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool {
|
|
|
|
existingDataCenters, _, existingDataNodes := countReplicas(replicas)
|
|
|
|
if _, found := existingDataNodes[possibleLocation.String()]; found {
|
|
// avoid duplicated volume on the same data node
|
|
return false
|
|
}
|
|
|
|
primaryDataCenters, _ := findTopKeys(existingDataCenters)
|
|
|
|
// ensure data center count is within limit
|
|
if _, found := existingDataCenters[possibleLocation.DataCenter()]; !found {
|
|
// different from existing dcs
|
|
if len(existingDataCenters) < replicaPlacement.DiffDataCenterCount+1 {
|
|
// lack on different dcs
|
|
return true
|
|
} else {
|
|
// adding this would go over the different dcs limit
|
|
return false
|
|
}
|
|
}
|
|
// now this is same as one of the existing data center
|
|
if !isAmong(possibleLocation.DataCenter(), primaryDataCenters) {
|
|
// not on one of the primary dcs
|
|
return false
|
|
}
|
|
|
|
// now this is one of the primary dcs
|
|
primaryDcRacks := make(map[string]int)
|
|
for _, replica := range replicas {
|
|
if replica.location.DataCenter() != possibleLocation.DataCenter() {
|
|
continue
|
|
}
|
|
primaryDcRacks[replica.location.Rack()] += 1
|
|
}
|
|
primaryRacks, _ := findTopKeys(primaryDcRacks)
|
|
sameRackCount := primaryDcRacks[possibleLocation.Rack()]
|
|
|
|
// ensure rack count is within limit
|
|
if _, found := primaryDcRacks[possibleLocation.Rack()]; !found {
|
|
// different from existing racks
|
|
if len(primaryDcRacks) < replicaPlacement.DiffRackCount+1 {
|
|
// lack on different racks
|
|
return true
|
|
} else {
|
|
// adding this would go over the different racks limit
|
|
return false
|
|
}
|
|
}
|
|
// now this is same as one of the existing racks
|
|
if !isAmong(possibleLocation.Rack(), primaryRacks) {
|
|
// not on the primary rack
|
|
return false
|
|
}
|
|
|
|
// now this is on the primary rack
|
|
|
|
// different from existing data nodes
|
|
if sameRackCount < replicaPlacement.SameRackCount+1 {
|
|
// lack on same rack
|
|
return true
|
|
} else {
|
|
// adding this would go over the same data node limit
|
|
return false
|
|
}
|
|
|
|
}
|
|
|
|
func findTopKeys(m map[string]int) (topKeys []string, max int) {
|
|
for k, c := range m {
|
|
if max < c {
|
|
topKeys = topKeys[:0]
|
|
topKeys = append(topKeys, k)
|
|
max = c
|
|
} else if max == c {
|
|
topKeys = append(topKeys, k)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func isAmong(key string, keys []string) bool {
|
|
for _, k := range keys {
|
|
if k == key {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
type VolumeReplica struct {
|
|
location *location
|
|
info *master_pb.VolumeInformationMessage
|
|
}
|
|
|
|
type location struct {
|
|
dc string
|
|
rack string
|
|
dataNode *master_pb.DataNodeInfo
|
|
}
|
|
|
|
func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location {
|
|
return location{
|
|
dc: dc,
|
|
rack: rack,
|
|
dataNode: dataNode,
|
|
}
|
|
}
|
|
|
|
func (l location) String() string {
|
|
return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id)
|
|
}
|
|
|
|
func (l location) Rack() string {
|
|
return fmt.Sprintf("%s %s", l.dc, l.rack)
|
|
}
|
|
|
|
func (l location) DataCenter() string {
|
|
return l.dc
|
|
}
|
|
|
|
func pickOneReplicaToCopyFrom(replicas []*VolumeReplica) *VolumeReplica {
|
|
mostRecent := replicas[0]
|
|
for _, replica := range replicas {
|
|
if replica.info.ModifiedAtSecond > mostRecent.info.ModifiedAtSecond {
|
|
mostRecent = replica
|
|
}
|
|
}
|
|
return mostRecent
|
|
}
|
|
|
|
func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[string]int) {
|
|
diffDc = make(map[string]int)
|
|
diffRack = make(map[string]int)
|
|
diffNode = make(map[string]int)
|
|
for _, replica := range replicas {
|
|
diffDc[replica.location.DataCenter()] += 1
|
|
diffRack[replica.location.Rack()] += 1
|
|
diffNode[replica.location.String()] += 1
|
|
}
|
|
return
|
|
}
|
|
|
|
func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica {
|
|
slices.SortFunc(replicas, func(a, b *VolumeReplica) int {
|
|
if a.info.Size != b.info.Size {
|
|
return int(a.info.Size - b.info.Size)
|
|
}
|
|
if a.info.ModifiedAtSecond != b.info.ModifiedAtSecond {
|
|
return int(a.info.ModifiedAtSecond - b.info.ModifiedAtSecond)
|
|
}
|
|
if a.info.CompactRevision != b.info.CompactRevision {
|
|
return int(a.info.CompactRevision - b.info.CompactRevision)
|
|
}
|
|
return 0
|
|
})
|
|
|
|
return replicas[0]
|
|
|
|
}
|
|
|
|
// check and fix misplaced volumes
|
|
|
|
func isMisplaced(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) bool {
|
|
|
|
for i := 0; i < len(replicas); i++ {
|
|
others := otherThan(replicas, i)
|
|
if !satisfyReplicaPlacement(replicaPlacement, others, *replicas[i].location) {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
func otherThan(replicas []*VolumeReplica, index int) (others []*VolumeReplica) {
|
|
for i := 0; i < len(replicas); i++ {
|
|
if index != i {
|
|
others = append(others, replicas[i])
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func pickOneMisplacedVolume(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) (toDelete *VolumeReplica) {
|
|
|
|
var deletionCandidates []*VolumeReplica
|
|
for i := 0; i < len(replicas); i++ {
|
|
others := otherThan(replicas, i)
|
|
if !isMisplaced(others, replicaPlacement) {
|
|
deletionCandidates = append(deletionCandidates, replicas[i])
|
|
}
|
|
}
|
|
if len(deletionCandidates) > 0 {
|
|
return pickOneReplicaToDelete(deletionCandidates, replicaPlacement)
|
|
}
|
|
|
|
return pickOneReplicaToDelete(replicas, replicaPlacement)
|
|
|
|
}
|