mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-09 18:32:43 +00:00
d02ee6d5df
The replica-placement rule (data-center/rack/same-node limits plus host anti-affinity) existed three times: the shell's satisfyReplicaPlacement/isGoodMove used by volume.balance, fix.replication, and tier.move, and a line-for-line port in the maintenance balance worker. Move the canonical logic into weed/topology/balancer on a shared Location type; the shell and worker keep thin adapters that convert their own location representation and call it. Behavior is unchanged (the shared IsGoodMove keeps the shell's reject-move-to-self guard, and all four replica test suites pass).
647 lines
22 KiB
Go
647 lines
22 KiB
Go
package shell
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"slices"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
"github.com/seaweedfs/seaweedfs/weed/topology/balancer"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/wildcard"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
|
)
|
|
|
|
func init() {
|
|
Commands = append(Commands, &commandVolumeFixReplication{})
|
|
}
|
|
|
|
type commandVolumeFixReplication struct {
|
|
collectionPattern *string
|
|
// TODO: move parameter flags here so we don't shuffle them around via function calls.
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) Name() string {
|
|
return "volume.fix.replication"
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) Help() string {
|
|
return `add or remove replicas to volumes that are missing replicas or over-replicated
|
|
|
|
This command finds all over-replicated volumes. If found, it will purge the oldest copies and stop.
|
|
|
|
This command also finds all under-replicated volumes, and finds volume servers with free slots.
|
|
If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
|
|
|
|
volume.fix.replication # do not take action
|
|
volume.fix.replication -apply # actually deleting or copying the volume files and mount the volume
|
|
volume.fix.replication -collectionPattern=important* # fix any collections with prefix "important"
|
|
|
|
Note:
|
|
* each time this will only add back one replica for each volume id that is under replicated.
|
|
If there are multiple replicas are missing, e.g. replica count is > 2, you may need to run this multiple times.
|
|
* do not run this too quickly within seconds, since the new volume replica may take a few seconds
|
|
to register itself to the master.
|
|
|
|
`
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) HasTag(tag CommandTag) bool {
|
|
return false && tag == ResourceHeavy // resource intensive only when deleting and checking with replicas.
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
|
|
|
volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
|
c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
|
|
applyChanges := volFixReplicationCommand.Bool("apply", false, "apply the fix")
|
|
// TODO: remove this alias
|
|
applyChangesAlias := volFixReplicationCommand.Bool("force", false, "apply the fix (alias for -apply)")
|
|
verbose := volFixReplicationCommand.Bool("verbose", false, "show volumes being checked and their statuses")
|
|
doDelete := volFixReplicationCommand.Bool("doDelete", true, "Also delete over-replicated volumes besides fixing under-replication")
|
|
doCheck := volFixReplicationCommand.Bool("doCheck", true, "Also check synchronization before deleting")
|
|
maxParallelization := volFixReplicationCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
|
|
retryCount := volFixReplicationCommand.Int("retry", 5, "how many times to retry")
|
|
volumesPerStep := volFixReplicationCommand.Int("volumesPerStep", 0, "how many volumes to fix in one cycle")
|
|
|
|
if err = volFixReplicationCommand.Parse(args); err != nil {
|
|
return nil
|
|
}
|
|
|
|
handleDeprecatedForceFlag(writer, volFixReplicationCommand, applyChangesAlias, applyChanges)
|
|
infoAboutSimulationMode(writer, *applyChanges, "-apply")
|
|
commandEnv.noLock = !*applyChanges
|
|
|
|
if err = commandEnv.confirmIsLocked(args); *applyChanges && err != nil {
|
|
return
|
|
}
|
|
|
|
ewg := NewErrorWaitGroup(*maxParallelization)
|
|
underReplicatedVolumeIdsCount := 1
|
|
for underReplicatedVolumeIdsCount > 0 {
|
|
fixedVolumeReplicas := map[string]int{}
|
|
|
|
// collect topology information
|
|
if *verbose {
|
|
fmt.Fprintf(writer, "wait 15 seconds and then collect topology information...\n")
|
|
}
|
|
topologyInfo, _, err := collectTopologyInfo(commandEnv, 15*time.Second)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// find all volumes that needs replication
|
|
// collect all data nodes
|
|
volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
|
|
|
|
if *verbose {
|
|
fmt.Fprintf(writer, "collected topology: %d locations, %d volumes to check\n", len(allLocations), len(volumeReplicas))
|
|
}
|
|
|
|
if len(allLocations) == 0 {
|
|
return fmt.Errorf("no data nodes at all")
|
|
}
|
|
|
|
// find all under replicated volumes
|
|
var underReplicatedVolumeIds, overReplicatedVolumeIds, misplacedVolumeIds []uint32
|
|
for vid, replicas := range volumeReplicas {
|
|
replica := replicas[0]
|
|
|
|
// Filter here so the termination counter matches what gets fixed; else -apply loops forever.
|
|
if !c.matchCollectionPattern(replica.info.Collection) {
|
|
continue
|
|
}
|
|
|
|
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
|
|
|
|
// build locations list for optional verbose output
|
|
locations := make([]string, 0, len(replicas))
|
|
for _, r := range replicas {
|
|
locations = append(locations, r.location.String())
|
|
}
|
|
|
|
if *verbose {
|
|
fmt.Fprintf(writer, "checking volume %d replication %s has %d replicas [%s]\n", replica.info.Id, replicaPlacement, len(replicas), strings.Join(locations, ", "))
|
|
}
|
|
|
|
switch {
|
|
case replicaPlacement.GetCopyCount() > len(replicas) || !satisfyReplicaCurrentLocation(replicaPlacement, replicas):
|
|
underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
|
|
fmt.Fprintf(writer, "volume %d replication %s, but under replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
|
|
case isMisplaced(replicas, replicaPlacement):
|
|
misplacedVolumeIds = append(misplacedVolumeIds, vid)
|
|
fmt.Fprintf(writer, "volume %d replication %s is not well placed [%s]\n", replica.info.Id, replicaPlacement, strings.Join(locations, ", "))
|
|
case replicaPlacement.GetCopyCount() < len(replicas):
|
|
overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
|
|
fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
|
|
}
|
|
}
|
|
underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds)
|
|
|
|
if !commandEnv.isLocked() {
|
|
return fmt.Errorf("lock is lost")
|
|
}
|
|
|
|
ewg.Reset()
|
|
ewg.Add(func() error {
|
|
// find the most underpopulated data nodes
|
|
fixedVolumeReplicas, err = c.fixUnderReplicatedVolumes(commandEnv, writer, *applyChanges, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep)
|
|
return err
|
|
})
|
|
if *doDelete {
|
|
ewg.Add(func() error {
|
|
return c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, overReplicatedVolumeIds, volumeReplicas, allLocations, pickOneReplicaToDelete)
|
|
})
|
|
ewg.Add(func() error {
|
|
return c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, misplacedVolumeIds, volumeReplicas, allLocations, pickOneMisplacedVolume)
|
|
})
|
|
}
|
|
if err := ewg.Wait(); err != nil {
|
|
return nil
|
|
}
|
|
|
|
if !*applyChanges {
|
|
break
|
|
}
|
|
|
|
// check that the topology has been updated
|
|
if len(fixedVolumeReplicas) > 0 {
|
|
fixedVolumes := make([]string, 0, len(fixedVolumeReplicas))
|
|
for k, _ := range fixedVolumeReplicas {
|
|
fixedVolumes = append(fixedVolumes, k)
|
|
}
|
|
volumeIdLocations, err := lookupVolumeIds(commandEnv, fixedVolumes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, volumeIdLocation := range volumeIdLocations {
|
|
volumeId := volumeIdLocation.VolumeOrFileId
|
|
volumeIdLocationCount := len(volumeIdLocation.Locations)
|
|
i := 0
|
|
for fixedVolumeReplicas[volumeId] >= volumeIdLocationCount {
|
|
fmt.Fprintf(writer, "the number of locations for volume %s has not increased yet, let's wait\n", volumeId)
|
|
time.Sleep(time.Duration(i+1) * time.Second * 7)
|
|
volumeLocIds, err := lookupVolumeIds(commandEnv, []string{volumeId})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
volumeIdLocationCount = len(volumeLocIds[0].Locations)
|
|
if *retryCount <= i {
|
|
return fmt.Errorf("replicas volume %s mismatch in topology", volumeId)
|
|
}
|
|
i += 1
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) {
|
|
volumeReplicas := make(map[uint32][]*VolumeReplica)
|
|
var allLocations []location
|
|
eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
|
loc := newLocation(string(dc), string(rack), dn)
|
|
for _, diskInfo := range dn.DiskInfos {
|
|
for _, v := range diskInfo.VolumeInfos {
|
|
volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
|
|
location: &loc,
|
|
info: v,
|
|
})
|
|
}
|
|
}
|
|
allLocations = append(allLocations, loc)
|
|
})
|
|
return volumeReplicas, allLocations
|
|
}
|
|
|
|
type SelectOneVolumeFunc func(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica
|
|
|
|
// checkOneVolume compares the index of replica a against b. With
|
|
// applyChanges=false it is a read-only divergence check; the over-replication
|
|
// trim must use that mode so it does not push the soon-to-be-deleted replica's
|
|
// needles into the survivor (which would resurrect data and is the opposite of
|
|
// a safe trim).
|
|
func checkOneVolume(a *VolumeReplica, b *VolumeReplica, writer io.Writer, commandEnv *CommandEnv, applyChanges bool) (err error) {
|
|
aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb()
|
|
defer func() {
|
|
aDB.Close()
|
|
bDB.Close()
|
|
}()
|
|
|
|
vcd := &volumeCheckDisk{
|
|
writer: writer,
|
|
commandEnv: commandEnv,
|
|
now: time.Now(),
|
|
|
|
verbose: false,
|
|
applyChanges: applyChanges,
|
|
syncDeletions: false,
|
|
nonRepairThreshold: float64(1),
|
|
}
|
|
|
|
// read index db
|
|
if err = vcd.readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode)); err != nil {
|
|
return fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err)
|
|
}
|
|
if err := vcd.readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode)); err != nil {
|
|
return fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err)
|
|
}
|
|
if _, err = vcd.doVolumeCheckDisk(aDB, bDB, a, b); err != nil {
|
|
return fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err)
|
|
}
|
|
return
|
|
}
|
|
|
|
// matchCollectionPattern reports whether collection matches -collectionPattern:
|
|
// empty matches everything, CollectionDefault matches the unnamed collection.
|
|
func (c *commandVolumeFixReplication) matchCollectionPattern(collection string) bool {
|
|
if *c.collectionPattern == "" {
|
|
return true
|
|
}
|
|
if *c.collectionPattern == CollectionDefault {
|
|
return collection == ""
|
|
}
|
|
return wildcard.MatchesWildcard(*c.collectionPattern, collection)
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, doCheck bool, volumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, selectOneVolumeFn SelectOneVolumeFunc) error {
|
|
if len(volumeIds) == 0 {
|
|
// nothing to do
|
|
return nil
|
|
}
|
|
|
|
for _, vid := range volumeIds {
|
|
replicas := volumeReplicas[vid]
|
|
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement))
|
|
|
|
replica := selectOneVolumeFn(replicas, replicaPlacement)
|
|
if replica == nil {
|
|
fmt.Fprintf(writer, "skip trimming volume %d: no safe replica to delete (would leave only read-only survivors)\n", vid)
|
|
continue
|
|
}
|
|
|
|
collectionIsMismatch := false
|
|
for _, volumeReplica := range replicas {
|
|
if volumeReplica.info.Collection != replica.info.Collection {
|
|
fmt.Fprintf(writer, "skip delete volume %d as collection %s is mismatch: %s\n", replica.info.Id, replica.info.Collection, volumeReplica.info.Collection)
|
|
collectionIsMismatch = true
|
|
}
|
|
}
|
|
if collectionIsMismatch {
|
|
continue
|
|
}
|
|
|
|
fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id)
|
|
|
|
if !applyChanges {
|
|
break
|
|
}
|
|
|
|
if doCheck {
|
|
var checkErr error
|
|
for _, replicaB := range replicas {
|
|
if replicaB.location.dataNode == replica.location.dataNode {
|
|
continue
|
|
}
|
|
// Read-only divergence check only: never write the doomed
|
|
// replica's needles into a survivor while trimming.
|
|
if checkErr = checkOneVolume(replica, replicaB, writer, commandEnv, false); checkErr != nil {
|
|
fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", replica.info.Id, replica.location.dataNode.Id, replicaB.location.dataNode.Id, checkErr)
|
|
break
|
|
}
|
|
}
|
|
if checkErr != nil {
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Surplus replica being trimmed; keep the remote object since other
|
|
// replicas of the same .vif still reference it.
|
|
if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id),
|
|
pb.NewServerAddressFromDataNode(replica.location.dataNode), false, true); err != nil {
|
|
fmt.Fprintf(writer, "deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
|
|
}
|
|
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, volumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (fixedVolumes map[string]int, err error) {
|
|
fixedVolumes = map[string]int{}
|
|
|
|
if len(volumeIds) == 0 {
|
|
return fixedVolumes, nil
|
|
}
|
|
|
|
if len(volumeIds) > volumesPerStep && volumesPerStep > 0 {
|
|
volumeIds = volumeIds[0:volumesPerStep]
|
|
}
|
|
for _, vid := range volumeIds {
|
|
for i := 0; i < retryCount+1; i++ {
|
|
var copied bool
|
|
if copied, err = c.fixOneUnderReplicatedVolume(commandEnv, writer, applyChanges, volumeReplicas, vid, allLocations); err == nil {
|
|
if applyChanges && copied {
|
|
fixedVolumes[strconv.FormatUint(uint64(vid), 10)] = len(volumeReplicas[vid])
|
|
}
|
|
break
|
|
} else {
|
|
fmt.Fprintf(writer, "fixing under replicated volume %d: %v\n", vid, err)
|
|
}
|
|
}
|
|
}
|
|
return fixedVolumes, nil
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) (bool, error) {
|
|
replicas := volumeReplicas[vid]
|
|
replica := pickOneReplicaToCopyFrom(replicas)
|
|
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
|
|
foundNewLocation := false
|
|
keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType))
|
|
fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType))
|
|
for _, dst := range allLocations {
|
|
// check whether data nodes satisfy the constraints
|
|
if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
|
|
// ask the volume server to replicate the volume
|
|
foundNewLocation = true
|
|
fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
|
|
|
|
if !applyChanges {
|
|
// adjust volume count
|
|
addVolumeCount(dst.dataNode.DiskInfos[replica.info.DiskType], 1)
|
|
return true, nil
|
|
}
|
|
|
|
err := replicateVolumeToServer(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(replica.info.Id),
|
|
pb.NewServerAddressFromDataNode(replica.location.dataNode),
|
|
pb.NewServerAddressFromDataNode(dst.dataNode),
|
|
replica.info.DiskType)
|
|
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// adjust volume count
|
|
addVolumeCount(dst.dataNode.DiskInfos[replica.info.DiskType], 1)
|
|
return true, nil
|
|
}
|
|
}
|
|
|
|
if !foundNewLocation {
|
|
fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func addVolumeCount(info *master_pb.DiskInfo, count int) {
|
|
if info == nil {
|
|
return
|
|
}
|
|
info.VolumeCount += int64(count)
|
|
info.FreeVolumeCount -= int64(count)
|
|
}
|
|
|
|
func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) {
|
|
fn := capacityByFreeVolumeCount(diskType)
|
|
slices.SortFunc(dataNodes, func(a, b location) int {
|
|
return int(fn(b.dataNode) - fn(a.dataNode))
|
|
})
|
|
}
|
|
|
|
func satisfyReplicaCurrentLocation(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica) bool {
|
|
existingDataCenters, existingRacks, _ := countReplicas(replicas)
|
|
|
|
if replicaPlacement.DiffDataCenterCount+1 > len(existingDataCenters) {
|
|
return false
|
|
}
|
|
if replicaPlacement.DiffRackCount+1 > len(existingRacks) {
|
|
return false
|
|
}
|
|
if replicaPlacement.SameRackCount > 0 {
|
|
foundSatisfyRack := false
|
|
for _, rackCount := range existingRacks {
|
|
if rackCount >= replicaPlacement.SameRackCount+1 {
|
|
foundSatisfyRack = true
|
|
}
|
|
}
|
|
return foundSatisfyRack
|
|
}
|
|
return true
|
|
}
|
|
|
|
/*
|
|
if on an existing data node {
|
|
return false
|
|
}
|
|
|
|
if different from existing dcs {
|
|
if lack on different dcs {
|
|
return true
|
|
}else{
|
|
return false
|
|
}
|
|
}
|
|
|
|
if not on primary dc {
|
|
return false
|
|
}
|
|
|
|
if different from existing racks {
|
|
if lack on different racks {
|
|
return true
|
|
}else{
|
|
return false
|
|
}
|
|
}
|
|
|
|
if not on primary rack {
|
|
return false
|
|
}
|
|
|
|
if lacks on same rack {
|
|
return true
|
|
} else {
|
|
|
|
return false
|
|
}
|
|
*/
|
|
// satisfyReplicaPlacement reports whether placing a replica at possibleLocation
|
|
// is consistent with the replication policy given the existing replicas. Thin
|
|
// adapter over weed/topology/balancer so the shell and the maintenance worker
|
|
// share one placement implementation.
|
|
func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool {
|
|
locs := make([]balancer.Location, len(replicas))
|
|
for i, r := range replicas {
|
|
locs[i] = toBalancerLocation(r.location)
|
|
}
|
|
return balancer.SatisfyReplicaPlacement(replicaPlacement, locs, toBalancerLocation(&possibleLocation))
|
|
}
|
|
|
|
type VolumeReplica struct {
|
|
location *location
|
|
info *master_pb.VolumeInformationMessage
|
|
}
|
|
|
|
type location struct {
|
|
dc string
|
|
rack string
|
|
dataNode *master_pb.DataNodeInfo
|
|
}
|
|
|
|
func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location {
|
|
return location{
|
|
dc: dc,
|
|
rack: rack,
|
|
dataNode: dataNode,
|
|
}
|
|
}
|
|
|
|
func (l location) String() string {
|
|
return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id)
|
|
}
|
|
|
|
func (l location) Rack() string {
|
|
return fmt.Sprintf("%s %s", l.dc, l.rack)
|
|
}
|
|
|
|
func (l location) DataCenter() string {
|
|
return l.dc
|
|
}
|
|
|
|
func pickOneReplicaToCopyFrom(replicas []*VolumeReplica) *VolumeReplica {
|
|
mostRecent := replicas[0]
|
|
for _, replica := range replicas {
|
|
if replica.info.ModifiedAtSecond > mostRecent.info.ModifiedAtSecond {
|
|
mostRecent = replica
|
|
}
|
|
}
|
|
return mostRecent
|
|
}
|
|
|
|
func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[string]int) {
|
|
diffDc = make(map[string]int)
|
|
diffRack = make(map[string]int)
|
|
diffNode = make(map[string]int)
|
|
for _, replica := range replicas {
|
|
diffDc[replica.location.DataCenter()] += 1
|
|
diffRack[replica.location.Rack()] += 1
|
|
diffNode[replica.location.String()] += 1
|
|
}
|
|
return
|
|
}
|
|
|
|
// pickOneReplicaToDelete selects the replica to trim when over-replicated.
|
|
// It only ever removes the smallest of multiple healthy writable replicas: a
|
|
// ReadOnly/integrity-flagged replica is never chosen for deletion, and the
|
|
// trim is refused (returns nil) when removing a writable replica would leave
|
|
// only ReadOnly survivors. VolumeStatus file_count>0 alone cannot prove the
|
|
// survivors' .dat is readable, so we do not over-claim survivor health.
|
|
// pickSmallestReplica returns the smallest replica (ties broken by oldest then
|
|
// lowest compact revision), or nil for an empty set.
|
|
func pickSmallestReplica(replicas []*VolumeReplica) *VolumeReplica {
|
|
if len(replicas) == 0 {
|
|
return nil
|
|
}
|
|
sorted := slices.Clone(replicas)
|
|
slices.SortFunc(sorted, func(a, b *VolumeReplica) int {
|
|
if a.info.Size != b.info.Size {
|
|
return int(a.info.Size - b.info.Size)
|
|
}
|
|
if a.info.ModifiedAtSecond != b.info.ModifiedAtSecond {
|
|
return int(a.info.ModifiedAtSecond - b.info.ModifiedAtSecond)
|
|
}
|
|
if a.info.CompactRevision != b.info.CompactRevision {
|
|
return int(a.info.CompactRevision - b.info.CompactRevision)
|
|
}
|
|
return 0
|
|
})
|
|
return sorted[0]
|
|
}
|
|
|
|
func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica {
|
|
// Over-replication trim: only ever remove a writable replica, and only
|
|
// when another writable one survives, so a healthy copy is never deleted
|
|
// down to a read-only (e.g. full or integrity-flagged) survivor.
|
|
var writable []*VolumeReplica
|
|
for _, r := range replicas {
|
|
if !r.info.ReadOnly {
|
|
writable = append(writable, r)
|
|
}
|
|
}
|
|
if len(writable) < 2 {
|
|
return nil
|
|
}
|
|
// Prefer a writable replica whose removal still satisfies placement, so the
|
|
// trim does not strip the only replica in a required failure domain. Fall
|
|
// back to the smallest writable if none keeps placement (a later misplaced
|
|
// cycle then re-balances).
|
|
var placementSafe []*VolumeReplica
|
|
for i, r := range replicas {
|
|
if r.info.ReadOnly {
|
|
continue
|
|
}
|
|
if !isMisplaced(otherThan(replicas, i), replicaPlacement) {
|
|
placementSafe = append(placementSafe, r)
|
|
}
|
|
}
|
|
if len(placementSafe) > 0 {
|
|
return pickSmallestReplica(placementSafe)
|
|
}
|
|
return pickSmallestReplica(writable)
|
|
}
|
|
|
|
// check and fix misplaced volumes
|
|
|
|
func isMisplaced(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) bool {
|
|
|
|
for i := 0; i < len(replicas); i++ {
|
|
others := otherThan(replicas, i)
|
|
if !satisfyReplicaPlacement(replicaPlacement, others, *replicas[i].location) {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
func otherThan(replicas []*VolumeReplica, index int) (others []*VolumeReplica) {
|
|
for i := 0; i < len(replicas); i++ {
|
|
if index != i {
|
|
others = append(others, replicas[i])
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func pickOneMisplacedVolume(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) (toDelete *VolumeReplica) {
|
|
|
|
// Relocation, not over-replication: pick the smallest replica to delete
|
|
// and recreate at a correct placement. Unlike the trim this must still act
|
|
// on read-only replicas (e.g. a full but misplaced volume), so it does not
|
|
// use pickOneReplicaToDelete's writable-survivor guard.
|
|
var deletionCandidates []*VolumeReplica
|
|
for i := 0; i < len(replicas); i++ {
|
|
others := otherThan(replicas, i)
|
|
if !isMisplaced(others, replicaPlacement) {
|
|
deletionCandidates = append(deletionCandidates, replicas[i])
|
|
}
|
|
}
|
|
if toDelete = pickSmallestReplica(deletionCandidates); toDelete != nil {
|
|
return toDelete
|
|
}
|
|
|
|
return pickSmallestReplica(replicas)
|
|
|
|
}
|