mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-13 21:31:32 +00:00
* fix(volume): don't fatal on missing .idx for remote-tiered volume A .vif left behind without its .idx (orphaned by a crashed move, partial copy, or hand-edit) would trip glog.Fatalf in checkIdxFile and take the whole volume server down on boot, killing every healthy volume on it too. For remote-tiered volumes treat it as a per-volume load error so the server can come up and the operator can clean up the stray .vif. Refs #9331. * fix(balance): skip remote-tiered volumes in admin balance detection The admin/worker balance detector had no equivalent of the shell-side guard ("does not move volume in remote storage" in command_volume_balance.go), so it scheduled moves on remote-tiered volumes. The "move" copies .idx/.vif to the destination and then calls Volume.Destroy on the source, which calls backendStorage.DeleteFile — deleting the remote object the destination's new .vif now points at. Populate HasRemoteCopy on the metrics emitted by both the admin maintenance scanner and the worker's master poll, then drop those volumes at the top of Detection. Fixes #9331. * Apply suggestion from @gemini-code-assist[bot] Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * fix(volume): keep remote data on volume-move-driven delete The on-source delete after a volume move (admin/worker balance and shell volume.move) ran Volume.Destroy with no way to opt out of the remote-object cleanup. Volume.Destroy unconditionally calls backendStorage.DeleteFile for remote-tiered volumes, so a successful move would copy .idx/.vif to the destination and then nuke the cloud object the destination's new .vif was already pointing at. Add VolumeDeleteRequest.keep_remote_data and plumb it through Store.DeleteVolume / DiskLocation.DeleteVolume / Volume.Destroy. The balance task and shell volume.move set it to true; the post-tier-upload cleanup of other replicas and the over-replication trim in volume.fix.replication also set it to true since the remote object is still referenced. Other real-delete callers keep the default. The delete-before-receive path in VolumeCopy also sets it: the inbound copy carries a .vif that may reference the same cloud object as the existing volume. Refs #9331. * test(storage): in-process remote-tier integration tests Cover the four operations the user is most likely to run against a cloud-tiered volume — balance/move, vacuum, EC encode, EC decode — by registering a local-disk-backed BackendStorage as the "remote" tier and exercising the real Volume / DiskLocation / EC encoder code paths. Locks in: - Destroy(keepRemoteData=true) preserves the remote object (move case) - Destroy(keepRemoteData=false) deletes it (real-delete case) - Vacuum/compact on a remote-tier volume never deletes the remote object - EC encode requires the local .dat (callers must download first) - EC encode + rebuild round-trips after a tier-down Tests run in-process and finish in under a second total — no cluster, binary, or external storage required. * fix(rust-volume): keep remote data on volume-move-driven delete Mirror the Go fix in seaweed-volume: plumb keep_remote_data through grpc volume_delete → Store.delete_volume → DiskLocation.delete_volume → Volume.destroy, and skip the s3-tier delete_file call when the flag is set. The pre-receive cleanup in volume_copy passes true for the same reason as the Go side: the inbound copy carries a .vif that may reference the same cloud object as the existing volume. The Rust loader already warns rather than fataling on a stray .vif without an .idx (volume.rs load_index_inmemory / load_index_redb), so no counterpart to the Go fatal-on-missing-idx fix is needed. Refs #9331. * fix(volume): preserve remote tier on IO-error eviction; fix EC test target Two review nits: - Store.MaybeAddVolumes' periodic cleanup pass deleted IO-errored volumes with keepRemoteData=false, so a transient local fault on a remote-tiered volume would also nuke the cloud object. Track the delete reason via a parallel slice and pass keepRemoteData=v.HasRemoteFile() for IO-error evictions; TTL-expired evictions still pass false. - TestRemoteTier_ECEncodeDecode_AfterDownload deleted shards 0..3 but called them "parity" — by the klauspost/reedsolomon convention shards 0..DataShardsCount-1 are data and DataShardsCount..TotalShardsCount-1 are parity. Switch the loop to delete the parity range so the intent matches the indices. --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
519 lines
20 KiB
Go
519 lines
20 KiB
Go
package shell
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
)
|
|
|
|
func init() {
|
|
Commands = append(Commands, &commandVolumeTierMove{})
|
|
}
|
|
|
|
type volumeTierMoveJob struct {
|
|
src pb.ServerAddress
|
|
vid needle.VolumeId
|
|
}
|
|
|
|
type commandVolumeTierMove struct {
|
|
activeServers sync.Map
|
|
queues map[pb.ServerAddress]chan volumeTierMoveJob
|
|
//activeServers map[pb.ServerAddress]struct{}
|
|
//activeServersLock sync.Mutex
|
|
//activeServersCond *sync.Cond
|
|
}
|
|
|
|
func (c *commandVolumeTierMove) Name() string {
|
|
return "volume.tier.move"
|
|
}
|
|
|
|
func (c *commandVolumeTierMove) Help() string {
|
|
return `change a volume from one disk type to another
|
|
|
|
volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collectionPattern=""] [-fullPercent=95] [-quietFor=1h] [-parallelLimit=4] [-toReplication=XYZ]
|
|
|
|
The command ensures the target replication is fully achieved on the destination tier
|
|
before deleting old replicas. This prevents data loss if a destination disk fails
|
|
before replication repair completes.
|
|
|
|
When -toReplication is specified, the moved volume is reconfigured with the new
|
|
replication setting. Otherwise, the volume's existing replication is preserved.
|
|
|
|
Note:
|
|
Use -collectionPattern="_default" to match only the default collection (volumes with no collection name).
|
|
Empty collectionPattern matches all collections.
|
|
|
|
`
|
|
}
|
|
|
|
func (c *commandVolumeTierMove) HasTag(CommandTag) bool {
|
|
return false
|
|
}
|
|
|
|
func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
|
|
|
tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
|
collectionPattern := tierCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
|
|
fullPercentage := tierCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
|
|
quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period")
|
|
source := tierCommand.String("fromDiskType", "", "the source disk type")
|
|
target := tierCommand.String("toDiskType", "", "the target disk type")
|
|
parallelLimit := tierCommand.Int("parallelLimit", 0, "limit the number of parallel copying jobs")
|
|
applyChange := tierCommand.Bool("apply", false, "actually apply the changes")
|
|
// TODO: remove this alias
|
|
applyChangeAlias := tierCommand.Bool("force", false, "actually apply the changes (alias for -apply)")
|
|
ioBytePerSecond := tierCommand.Int64("ioBytePerSecond", 0, "limit the speed of move")
|
|
replicationString := tierCommand.String("toReplication", "", "the new target replication setting")
|
|
|
|
if err = tierCommand.Parse(args); err != nil {
|
|
return nil
|
|
}
|
|
|
|
handleDeprecatedForceFlag(writer, tierCommand, applyChangeAlias, applyChange)
|
|
infoAboutSimulationMode(writer, *applyChange, "-apply")
|
|
|
|
if err = commandEnv.confirmIsLocked(args); err != nil {
|
|
return
|
|
}
|
|
fromDiskType := types.ToDiskType(*source)
|
|
toDiskType := types.ToDiskType(*target)
|
|
|
|
if fromDiskType == toDiskType {
|
|
return fmt.Errorf("source tier %s is the same as target tier %s", fromDiskType, toDiskType)
|
|
}
|
|
|
|
// collect topology information
|
|
topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv, 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// collect all volumes that should change
|
|
volumeIds, err := collectVolumeIdsForTierChange(topologyInfo, volumeSizeLimitMb, fromDiskType, *collectionPattern, *fullPercentage, *quietPeriod)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
fmt.Printf("tier move volumes: %v\n", volumeIds)
|
|
|
|
// Collect volume ID to collection name mapping for the sync operation
|
|
volumeIdToCollection := collectVolumeIdToCollection(topologyInfo, volumeIds)
|
|
|
|
_, allLocations := collectVolumeReplicaLocations(topologyInfo)
|
|
allLocations = filterLocationsByDiskType(allLocations, toDiskType)
|
|
keepDataNodesSorted(allLocations, toDiskType)
|
|
|
|
if len(allLocations) > 0 && *parallelLimit > 0 && *parallelLimit < len(allLocations) {
|
|
allLocations = allLocations[:*parallelLimit]
|
|
}
|
|
|
|
wg := sync.WaitGroup{}
|
|
bufferLen := len(allLocations)
|
|
c.queues = make(map[pb.ServerAddress]chan volumeTierMoveJob)
|
|
|
|
for _, dst := range allLocations {
|
|
destServerAddress := pb.NewServerAddressFromDataNode(dst.dataNode)
|
|
c.queues[destServerAddress] = make(chan volumeTierMoveJob, bufferLen)
|
|
|
|
wg.Add(1)
|
|
go func(dst location, jobs <-chan volumeTierMoveJob, applyChanges bool) {
|
|
defer wg.Done()
|
|
for job := range jobs {
|
|
fmt.Fprintf(writer, "moving volume %d from %s to %s with disk type %s ...\n", job.vid, job.src, dst.dataNode.Id, toDiskType.ReadableString())
|
|
|
|
locations, found := commandEnv.MasterClient.GetLocationsClone(uint32(job.vid))
|
|
if !found {
|
|
fmt.Printf("volume %d not found", job.vid)
|
|
continue
|
|
}
|
|
|
|
unlock := c.Lock(job.src)
|
|
|
|
if applyChanges {
|
|
if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst, *ioBytePerSecond, replicationString); err != nil {
|
|
fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", job.vid, job.src, dst.dataNode.Id, err)
|
|
}
|
|
}
|
|
unlock()
|
|
}
|
|
}(dst, c.queues[destServerAddress], *applyChange)
|
|
}
|
|
|
|
for _, vid := range volumeIds {
|
|
collection := volumeIdToCollection[vid]
|
|
if err = c.doVolumeTierMove(commandEnv, writer, vid, collection, toDiskType, allLocations); err != nil {
|
|
fmt.Printf("tier move volume %d: %v\n", vid, err)
|
|
}
|
|
allLocations = rotateDataNodes(allLocations)
|
|
}
|
|
for key, _ := range c.queues {
|
|
close(c.queues[key])
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *commandVolumeTierMove) Lock(key pb.ServerAddress) func() {
|
|
value, _ := c.activeServers.LoadOrStore(key, &sync.Mutex{})
|
|
mtx := value.(*sync.Mutex)
|
|
mtx.Lock()
|
|
|
|
return func() { mtx.Unlock() }
|
|
}
|
|
|
|
func filterLocationsByDiskType(dataNodes []location, diskType types.DiskType) (ret []location) {
|
|
for _, loc := range dataNodes {
|
|
_, found := loc.dataNode.DiskInfos[string(diskType)]
|
|
if found {
|
|
ret = append(ret, loc)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func rotateDataNodes(dataNodes []location) []location {
|
|
if len(dataNodes) > 0 {
|
|
return append(dataNodes[1:], dataNodes[0])
|
|
} else {
|
|
return dataNodes
|
|
}
|
|
}
|
|
|
|
func isOneOf(server string, locations []wdclient.Location) bool {
|
|
for _, loc := range locations {
|
|
if server == loc.Url {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, collection string, toDiskType types.DiskType, allLocations []location) (err error) {
|
|
// find volume location
|
|
locations, found := commandEnv.MasterClient.GetLocationsClone(uint32(vid))
|
|
if !found {
|
|
return fmt.Errorf("volume %d not found", vid)
|
|
}
|
|
|
|
// find one server with the most empty volume slots with target disk type
|
|
hasFoundTarget := false
|
|
fn := capacityByFreeVolumeCount(toDiskType)
|
|
for _, dst := range allLocations {
|
|
if fn(dst.dataNode) > 0 && !hasFoundTarget {
|
|
// ask the volume server to replicate the volume
|
|
if isOneOf(dst.dataNode.Id, locations) {
|
|
continue
|
|
}
|
|
|
|
// Sync replicas and select the best one (with highest file count) for multi-replica volumes
|
|
// This addresses data inconsistency risk in multi-replica volumes (issue #7797)
|
|
// by syncing missing entries between replicas before moving
|
|
sourceLoc, selectErr := syncAndSelectBestReplica(
|
|
commandEnv.option.GrpcDialOption, vid, collection, locations, dst.dataNode.Id, writer)
|
|
if selectErr != nil {
|
|
fmt.Fprintf(writer, "failed to sync and select source replica for volume %d: %v\n", vid, selectErr)
|
|
continue
|
|
}
|
|
sourceVolumeServer := sourceLoc.ServerAddress()
|
|
|
|
if sourceVolumeServer == "" {
|
|
continue
|
|
}
|
|
hasFoundTarget = true
|
|
|
|
// adjust volume count
|
|
addVolumeCount(dst.dataNode.DiskInfos[string(toDiskType)], 1)
|
|
|
|
destServerAddress := pb.NewServerAddressFromDataNode(dst.dataNode)
|
|
c.queues[destServerAddress] <- volumeTierMoveJob{sourceVolumeServer, vid}
|
|
}
|
|
}
|
|
|
|
if !hasFoundTarget {
|
|
fmt.Fprintf(writer, "can not find disk type %s for volume %d\n", toDiskType.ReadableString(), vid)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location, ioBytePerSecond int64, replicationString *string) (err error) {
|
|
|
|
if !commandEnv.isLocked() {
|
|
return fmt.Errorf("lock is lost")
|
|
}
|
|
|
|
// mark all replicas as read only
|
|
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false, false); err != nil {
|
|
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
|
|
}
|
|
newAddress := pb.NewServerAddressFromDataNode(dst.dataNode)
|
|
|
|
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, newAddress, 5*time.Second, toDiskType.ReadableString(), ioBytePerSecond, true); err != nil {
|
|
// mark all replicas as writable
|
|
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true, false); err != nil {
|
|
glog.Errorf("mark volume %d as writable on %s: %v", vid, locations[0].Url, err)
|
|
}
|
|
|
|
return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err)
|
|
}
|
|
|
|
// If move is successful and replication is not empty, alter moved volume's replication setting
|
|
if *replicationString != "" {
|
|
if err = configureVolumeReplication(commandEnv.option.GrpcDialOption, vid, newAddress, *replicationString); err != nil {
|
|
// LiveMoveVolume already deleted sourceVolumeServer; mark surviving
|
|
// old replicas writable before aborting so the volume stays accessible.
|
|
restoreSurvivingReplicasWritable(commandEnv, vid, locations, sourceVolumeServer)
|
|
return fmt.Errorf("configure replication %s on volume %d at %s: %v", *replicationString, vid, newAddress, err)
|
|
}
|
|
}
|
|
|
|
// Ensure the required number of replicas exist on the target tier BEFORE
|
|
// deleting old replicas to avoid data-loss risk.
|
|
// Use the explicit -toReplication if given, otherwise preserve the volume's
|
|
// existing replication from the source tier.
|
|
preserveServers, replicateErr := c.ensureReplicationFulfilled(commandEnv, writer, vid, toDiskType, dst, *replicationString)
|
|
if replicateErr != nil {
|
|
// Replication not fully achieved — do NOT delete old replicas.
|
|
restoreSurvivingReplicasWritable(commandEnv, vid, locations, sourceVolumeServer)
|
|
return fmt.Errorf("volume %d moved to %s but failed to fulfill replication, old replicas preserved: %v", vid, dst.dataNode.Id, replicateErr)
|
|
}
|
|
|
|
// Mark preserved pre-existing target-tier replicas as writable.
|
|
// They were marked read-only at the start of the move and would otherwise
|
|
// stay read-only since we're keeping rather than deleting them.
|
|
for _, loc := range locations {
|
|
if preserveServers[loc.Url] {
|
|
if markErr := markVolumeWritable(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress(), true, false); markErr != nil {
|
|
glog.Errorf("mark volume %d as writable on preserved replica %s: %v", vid, loc.Url, markErr)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Remove old replicas that are NOT needed by the fulfilled replication.
|
|
// Skip the move destination, the already-deleted source, and any pre-existing
|
|
// target-tier replicas that were counted toward replication fulfillment.
|
|
for _, loc := range locations {
|
|
if loc.Url == dst.dataNode.Id || loc.ServerAddress() == sourceVolumeServer {
|
|
continue
|
|
}
|
|
if preserveServers[loc.Url] {
|
|
continue
|
|
}
|
|
if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress(), false, false); err != nil {
|
|
fmt.Fprintf(writer, "failed to delete volume %d on %s: %v\n", vid, loc.Url, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// restoreSurvivingReplicasWritable marks old replicas writable after a failure,
|
|
// skipping the source that was already deleted by LiveMoveVolume.
|
|
func restoreSurvivingReplicasWritable(commandEnv *CommandEnv, vid needle.VolumeId, locations []wdclient.Location, deletedSource pb.ServerAddress) {
|
|
for _, loc := range locations {
|
|
if loc.ServerAddress() == deletedSource {
|
|
continue
|
|
}
|
|
if markErr := markVolumeWritable(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress(), true, false); markErr != nil {
|
|
glog.Errorf("mark volume %d as writable on %s: %v", vid, loc.Url, markErr)
|
|
}
|
|
}
|
|
}
|
|
|
|
// ensureReplicationFulfilled creates additional replicas of the volume on the target tier
|
|
// to satisfy the requested replication placement. It re-collects topology after the initial
|
|
// move so it can see the newly placed volume and find suitable destinations for additional copies.
|
|
// It returns a set of server URLs (from the original locations) that host target-tier replicas
|
|
// counted toward fulfillment, so the caller can avoid deleting them during cleanup.
|
|
func (c *commandVolumeTierMove) ensureReplicationFulfilled(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, movedDst location, replicationString string) (preserveServers map[string]bool, err error) {
|
|
preserveServers = make(map[string]bool)
|
|
sourceAddress := pb.NewServerAddressFromDataNode(movedDst.dataNode)
|
|
|
|
// Wait briefly for the master to receive heartbeats reflecting the move,
|
|
// then re-collect topology to get the current state.
|
|
topologyInfo, _, err := collectTopologyInfo(commandEnv, 5*time.Second)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("collect topology: %v", err)
|
|
}
|
|
|
|
volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
|
|
allLocations = filterLocationsByDiskType(allLocations, toDiskType)
|
|
keepDataNodesSorted(allLocations, toDiskType)
|
|
|
|
existingReplicas := volumeReplicas[uint32(vid)]
|
|
if len(existingReplicas) == 0 {
|
|
return nil, fmt.Errorf("volume %d not found in topology after move", vid)
|
|
}
|
|
|
|
// Build a set of all data nodes that already host this volume (any disk type)
|
|
// so we don't try to VolumeCopy to a server that already has it.
|
|
nodesWithVolume := make(map[string]bool)
|
|
for _, r := range existingReplicas {
|
|
nodesWithVolume[r.location.dataNode.Id] = true
|
|
}
|
|
|
|
// Determine the target replication: use explicit -toReplication if given,
|
|
// otherwise read the volume's existing replication setting.
|
|
var replicaPlacement *super_block.ReplicaPlacement
|
|
if replicationString != "" {
|
|
replicaPlacement, err = super_block.NewReplicaPlacementFromString(replicationString)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse replication %s: %v", replicationString, err)
|
|
}
|
|
} else {
|
|
replicaPlacement, err = super_block.NewReplicaPlacementFromByte(byte(existingReplicas[0].info.ReplicaPlacement))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse existing replication for volume %d: %v", vid, err)
|
|
}
|
|
}
|
|
|
|
requiredCopies := replicaPlacement.GetCopyCount()
|
|
if requiredCopies <= 1 {
|
|
// No additional replicas needed (e.g., replication "000")
|
|
return preserveServers, nil
|
|
}
|
|
|
|
// Filter to only replicas on the target disk type (the newly moved one).
|
|
var targetTierReplicas []*VolumeReplica
|
|
for _, r := range existingReplicas {
|
|
if types.ToDiskType(r.info.DiskType) == toDiskType {
|
|
targetTierReplicas = append(targetTierReplicas, r)
|
|
// Track pre-existing target-tier replicas so the caller won't delete them.
|
|
preserveServers[r.location.dataNode.Id] = true
|
|
}
|
|
}
|
|
if len(targetTierReplicas) == 0 {
|
|
return nil, fmt.Errorf("volume %d not found on target tier %s in topology after move", vid, toDiskType)
|
|
}
|
|
|
|
// Ensure all existing target-tier replicas have the correct replication metadata.
|
|
// The primary moved replica is already configured in doMoveOneVolume, but there may
|
|
// be pre-existing replicas on the target tier that need updating.
|
|
if replicationString != "" {
|
|
for _, r := range targetTierReplicas {
|
|
addr := pb.NewServerAddressFromDataNode(r.location.dataNode)
|
|
if configErr := configureVolumeReplication(commandEnv.option.GrpcDialOption, vid, addr, replicationString); configErr != nil {
|
|
return nil, fmt.Errorf("volume %d: failed to configure replication on existing replica %s: %v", vid, r.location.dataNode.Id, configErr)
|
|
}
|
|
}
|
|
}
|
|
|
|
additionalCopiesNeeded := requiredCopies - len(targetTierReplicas)
|
|
if additionalCopiesNeeded <= 0 {
|
|
return preserveServers, nil
|
|
}
|
|
|
|
fmt.Fprintf(writer, "volume %d: creating %d additional replica(s) for replication %s\n", vid, additionalCopiesNeeded, replicaPlacement)
|
|
|
|
fn := capacityByFreeVolumeCount(toDiskType)
|
|
copiesMade := 0
|
|
for _, candidateDst := range allLocations {
|
|
if copiesMade >= additionalCopiesNeeded {
|
|
break
|
|
}
|
|
if fn(candidateDst.dataNode) <= 0 {
|
|
continue
|
|
}
|
|
// Skip nodes that already host this volume on any disk type to avoid
|
|
// VolumeCopy conflicts (e.g., same volume on source tier and target tier).
|
|
if nodesWithVolume[candidateDst.dataNode.Id] {
|
|
continue
|
|
}
|
|
if !satisfyReplicaPlacement(replicaPlacement, targetTierReplicas, candidateDst) {
|
|
continue
|
|
}
|
|
|
|
candidateAddress := pb.NewServerAddressFromDataNode(candidateDst.dataNode)
|
|
fmt.Fprintf(writer, "volume %d: replicating from %s to %s\n", vid, sourceAddress, candidateDst.dataNode.Id)
|
|
|
|
if copyErr := replicateVolumeToServer(commandEnv.option.GrpcDialOption, writer, vid, sourceAddress, candidateAddress, toDiskType.ReadableString()); copyErr != nil {
|
|
return nil, fmt.Errorf("replicate volume %d to %s: %v", vid, candidateDst.dataNode.Id, copyErr)
|
|
}
|
|
|
|
// Configure replication on the new replica if an explicit -toReplication was given.
|
|
// Without it, VolumeCopy already preserves the source's replication from the super block.
|
|
if replicationString != "" {
|
|
if configErr := configureVolumeReplication(commandEnv.option.GrpcDialOption, vid, candidateAddress, replicationString); configErr != nil {
|
|
return nil, fmt.Errorf("volume %d: failed to configure replication on %s: %v", vid, candidateDst.dataNode.Id, configErr)
|
|
}
|
|
}
|
|
|
|
// Track the new replica for placement decisions
|
|
targetTierReplicas = append(targetTierReplicas, &VolumeReplica{
|
|
location: &candidateDst,
|
|
info: targetTierReplicas[0].info,
|
|
})
|
|
addVolumeCount(candidateDst.dataNode.DiskInfos[string(toDiskType)], 1)
|
|
copiesMade++
|
|
}
|
|
|
|
if copiesMade < additionalCopiesNeeded {
|
|
return nil, fmt.Errorf("could only create %d of %d additional replicas for volume %d (replication %s): not enough eligible destinations", copiesMade, additionalCopiesNeeded, vid, replicaPlacement)
|
|
}
|
|
|
|
fmt.Fprintf(writer, "volume %d: replication %s fulfilled with %d total copies\n", vid, replicaPlacement, requiredCopies)
|
|
return preserveServers, nil
|
|
}
|
|
|
|
func collectVolumeIdsForTierChange(topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, collectionPattern string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
|
|
|
|
quietSeconds := int64(quietPeriod / time.Second)
|
|
nowUnixSeconds := time.Now().Unix()
|
|
|
|
fmt.Printf("collect %s volumes quiet for: %d seconds\n", sourceTier, quietSeconds)
|
|
|
|
vidMap := make(map[uint32]bool)
|
|
eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
|
for _, diskInfo := range dn.DiskInfos {
|
|
for _, v := range diskInfo.VolumeInfos {
|
|
// check collection name pattern
|
|
if collectionPattern != "" {
|
|
var matched bool
|
|
if collectionPattern == CollectionDefault {
|
|
matched = v.Collection == ""
|
|
} else {
|
|
var matchErr error
|
|
matched, matchErr = filepath.Match(collectionPattern, v.Collection)
|
|
if matchErr != nil {
|
|
err = fmt.Errorf("collection pattern %q failed to match: %w", collectionPattern, matchErr)
|
|
return
|
|
}
|
|
}
|
|
if !matched {
|
|
continue
|
|
}
|
|
}
|
|
|
|
if v.ModifiedAtSecond+quietSeconds < nowUnixSeconds && types.ToDiskType(v.DiskType) == sourceTier {
|
|
if float64(v.Size) > fullPercentage/100*float64(volumeSizeLimitMb)*1024*1024 {
|
|
vidMap[v.Id] = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
})
|
|
|
|
// Check if an error occurred during iteration and return early
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
for vid := range vidMap {
|
|
vids = append(vids, needle.VolumeId(vid))
|
|
}
|
|
|
|
return
|
|
}
|