mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-22 09:41:28 +00:00
* fix(storage): prune partial EC shards when sibling disk has healthy .dat (#9478) handleFoundEcxFile only checks for .dat in the same disk location as the EC shards. In a multi-disk volume server an interrupted encode can leave .ec?? + .ecx on disk B while the source .dat still lives on disk A: the per-disk loader sees no .dat next to .ecx, mistakes the leftover for a distributed-EC layout, and mounts the partial shards. The volume server then heartbeats both a regular replica and an EC shard for the same vid and the master keeps both. Sweep the store after per-disk loading and before the cross-disk reconcile to delete partial EC files when a healthy .dat for the same (collection, vid) exists on a sibling disk. Push DeletedEcShardsChan for every pruned shard so master forgets the new-shard message the per-disk pass already emitted, instead of waiting for the next periodic heartbeat. * fix(seaweed-volume): mirror prune of partial EC with sibling .dat (#9478) Rust port of the same Store-level prune added to weed/storage. The per-disk EC loader in disk_location.rs only checks for .dat in the same disk as the EC shards, so an interrupted encode that leaves .ec?? + .ecx on disk B while the source .dat sits on disk A is mounted as if it were a distributed-EC layout. The volume server then heartbeats both a regular replica and an EC shard for the same vid. Sweep the store after per-disk loading and before the cross-disk reconcile, dropping in-memory EcVolumes with fewer than DATA_SHARDS_COUNT shards when a .dat for the same (collection, vid) exists on a sibling disk, and remove all on-disk EC artefacts for them. The Rust heartbeat path already diff-emits deletes from the next ec_volumes snapshot, so no explicit delete-channel push is needed here. Tests cover both the issue 9478 layout and a distributed-EC layout with no .dat anywhere on the store, which must be left alone. * fix(storage): validate sibling .dat size before deleting partial EC (#9478) The earlier prune deleted partial EC files whenever any .dat for the same vid existed on a sibling disk — including a zero-byte shell. A shell is no more useful than the partial shard it would replace, and the partial shard might still combine with shards on other servers in a recoverable distributed-EC layout. Wiping it based on a corrupt sibling .dat is data loss masquerading as cleanup. Tighten the check: when the EC's .vif recorded a non-zero source size in datFileSize, require the sibling .dat to be at least that many bytes; otherwise fall back to "at least a superblock". The .vif value is what the encoder wrote at the moment the source was sealed, so a sibling .dat smaller than that is provably truncated. Carry the size through indexDatOwners alongside the location. The Rust port had the same gap and an additional bug behind it: EcVolume::new wasn't reading datFileSize from .vif, so the safety check always fell back to the superblock floor. Wire datFileSize through. The existing shard-size calculation in LocateEcShardNeedleInterval already uses dat_file_size when non-zero, so populating it also matches Go's behaviour there. Tests cover the truncated-sibling case in both ports.
This commit is contained in:
@@ -304,7 +304,11 @@ impl DiskLocation {
|
||||
}
|
||||
|
||||
/// Remove all EC-related files for a volume.
|
||||
fn remove_ec_volume_files(&self, collection: &str, vid: VolumeId) {
|
||||
/// `pub(crate)` so the Store-level cross-disk passes in
|
||||
/// `store_ec_reconcile.rs` can call it to scrub partial EC artefacts
|
||||
/// when a healthy `.dat` for the same vid lives on a sibling disk
|
||||
/// (seaweedfs/seaweedfs#9478).
|
||||
pub(crate) fn remove_ec_volume_files(&self, collection: &str, vid: VolumeId) {
|
||||
let base = volume_file_name(&self.directory, collection, vid);
|
||||
let idx_base = volume_file_name(&self.idx_directory, collection, vid);
|
||||
const MAX_SHARD_COUNT: usize = 32;
|
||||
|
||||
@@ -120,10 +120,15 @@ impl EcVolume {
|
||||
shards.push(None);
|
||||
}
|
||||
|
||||
// Read expire_at_sec and version from .vif if present (matches Go's MaybeLoadVolumeInfo).
|
||||
// Prefer the data dir; fall back to the idx dir for the
|
||||
// cross-disk reconcile case (#9212 / #9244).
|
||||
let (expire_at_sec, vif_version) = {
|
||||
// Read expire_at_sec, version, and dat_file_size from .vif if
|
||||
// present (matches Go's MaybeLoadVolumeInfo). Prefer the data
|
||||
// dir; fall back to the idx dir for the cross-disk reconcile
|
||||
// case (#9212 / #9244). `dat_file_size` is the source .dat
|
||||
// size at encode time, used both by `locate_ec_shard_needle`
|
||||
// for shard-size math and by the Store-level prune in
|
||||
// `store_ec_reconcile.rs` to verify a sibling-disk .dat is
|
||||
// plausibly the encoding source (#9478).
|
||||
let (expire_at_sec, vif_version, vif_dat_file_size) = {
|
||||
let vif_path = locate_vif_path(dir, dir_idx, collection, volume_id);
|
||||
if let Ok(vif_content) = std::fs::read_to_string(&vif_path) {
|
||||
if let Ok(vif_info) =
|
||||
@@ -134,12 +139,12 @@ impl EcVolume {
|
||||
} else {
|
||||
Version::current()
|
||||
};
|
||||
(vif_info.expire_at_sec, ver)
|
||||
(vif_info.expire_at_sec, ver, vif_info.dat_file_size)
|
||||
} else {
|
||||
(0, Version::current())
|
||||
(0, Version::current(), 0)
|
||||
}
|
||||
} else {
|
||||
(0, Version::current())
|
||||
(0, Version::current(), 0)
|
||||
}
|
||||
};
|
||||
|
||||
@@ -150,7 +155,7 @@ impl EcVolume {
|
||||
dir_idx: dir_idx.to_string(),
|
||||
version: vif_version,
|
||||
shards,
|
||||
dat_file_size: 0,
|
||||
dat_file_size: vif_dat_file_size,
|
||||
data_shards,
|
||||
parity_shards,
|
||||
ecx_file: None,
|
||||
|
||||
@@ -85,6 +85,17 @@ impl Store {
|
||||
|
||||
self.locations.push(loc);
|
||||
|
||||
// First scrub partial EC artefacts left on one disk by an
|
||||
// interrupted encode while the source .dat still lives on a
|
||||
// sibling disk of the same store. The per-disk loader cannot
|
||||
// see the sibling .dat and so loads the partial shards as if
|
||||
// they were a distributed-EC layout, which makes the volume
|
||||
// server heartbeat both a regular replica and an EC shard set
|
||||
// for the same vid (seaweedfs/seaweedfs#9478). Running before
|
||||
// the cross-disk reconcile keeps that pass from later
|
||||
// re-loading shards we just cleaned up.
|
||||
self.prune_incomplete_ec_with_sibling_dat();
|
||||
|
||||
// After every disk has finished its per-disk EC scan, sweep
|
||||
// the store for shards that live on a disk without local index
|
||||
// files and load them by reaching across to a sibling disk's
|
||||
@@ -106,6 +117,7 @@ impl Store {
|
||||
tracing::error!("load_new_volumes error in {}: {}", loc.directory, e);
|
||||
}
|
||||
}
|
||||
self.prune_incomplete_ec_with_sibling_dat();
|
||||
self.reconcile_ec_shards_across_disks();
|
||||
}
|
||||
|
||||
|
||||
@@ -21,9 +21,22 @@ use std::fs;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::storage::disk_location::{is_ec_shard_extension, parse_collection_volume_id_pub};
|
||||
use crate::storage::erasure_coding::ec_shard::DATA_SHARDS_COUNT;
|
||||
use crate::storage::store::Store;
|
||||
use crate::storage::super_block::SUPER_BLOCK_SIZE;
|
||||
use crate::storage::types::VolumeId;
|
||||
|
||||
/// Sibling-disk `.dat` candidate for `prune_incomplete_ec_with_sibling_dat`.
|
||||
/// We record both the disk index and the file's size: the size is
|
||||
/// consulted before deleting any EC artefacts. A zero-byte or truncated
|
||||
/// `.dat` is not a credible fallback, and we'd rather leave the partial
|
||||
/// EC in place than wipe it based on garbage.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
struct DatOwnerInfo {
|
||||
location: usize,
|
||||
size: u64,
|
||||
}
|
||||
|
||||
/// Key for orphan-shard reconciliation: collection + volume id. Two
|
||||
/// collections can re-use the same volume id, and we must only pair
|
||||
/// shards with their own `.ecx`.
|
||||
@@ -136,6 +149,187 @@ impl Store {
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove leftover EC artefacts on one disk when a healthy `.dat`
|
||||
/// for the same `(collection, vid)` lives on a sibling disk of the
|
||||
/// same store. Mirrors `Store.pruneIncompleteEcWithSiblingDat` in
|
||||
/// `weed/storage/store_ec_reconcile.go` (seaweedfs/seaweedfs#9478).
|
||||
///
|
||||
/// `DiskLocation::handle_found_ecx_file` only looks for `.dat` in
|
||||
/// the same disk as the EC shards. In a multi-disk volume server an
|
||||
/// interrupted EC encode can leave `.ec??` + `.ecx` on disk B while
|
||||
/// the source `.dat` still lives on disk A. The per-disk loader
|
||||
/// sees no local `.dat`, classifies the layout as distributed EC,
|
||||
/// and mounts the partial shards. The volume server then heartbeats
|
||||
/// both a regular replica and an EC shard for the same vid, and
|
||||
/// master ends up with a contradictory view.
|
||||
///
|
||||
/// Cleanup is gated on `shard_count < DATA_SHARDS_COUNT` so that a
|
||||
/// deliberate "full local EC, `.dat` retained" layout split across
|
||||
/// two disks (`.dat` on disk A, all 10+ shards on disk B) is left
|
||||
/// alone — the per-disk loader already keeps that configuration on
|
||||
/// a single disk, and pruning it here would be a behaviour
|
||||
/// regression. Distributed EC volumes (no `.dat` on any disk of
|
||||
/// this server) also fall through unchanged because the `.dat`
|
||||
/// index never matches.
|
||||
///
|
||||
/// Before deleting any EC files we also check that the sibling
|
||||
/// `.dat` is plausibly the encoding source: at least
|
||||
/// `SUPER_BLOCK_SIZE` bytes long, and — when the EC's `.vif`
|
||||
/// recorded a non-zero source size in `dat_file_size` — at least
|
||||
/// that many bytes. A zero-byte shell or a truncated `.dat` does
|
||||
/// not justify wiping the partial EC, because that EC shard may
|
||||
/// still combine usefully with shards on other servers in a
|
||||
/// recoverable distributed-EC layout.
|
||||
///
|
||||
/// We don't have to push anything to a deleted-shards channel
|
||||
/// here: the Rust heartbeat path in `server/heartbeat.rs` diffs
|
||||
/// the current `ec_volumes` snapshot against the previous one each
|
||||
/// tick, so the first heartbeat after the prune naturally emits a
|
||||
/// delete delta for whatever the per-disk pass had already
|
||||
/// reported.
|
||||
pub fn prune_incomplete_ec_with_sibling_dat(&mut self) {
|
||||
if self.locations.len() < 2 {
|
||||
return;
|
||||
}
|
||||
|
||||
let dat_owners = self.index_dat_owners();
|
||||
if dat_owners.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Snapshot under an immutable borrow so we can release it
|
||||
// before taking the mutable borrow needed for cleanup.
|
||||
struct Victim {
|
||||
loc_idx: usize,
|
||||
collection: String,
|
||||
vid: VolumeId,
|
||||
dat_dir: String,
|
||||
shard_count: usize,
|
||||
}
|
||||
let mut victims: Vec<Victim> = Vec::new();
|
||||
for (loc_idx, loc) in self.locations.iter().enumerate() {
|
||||
for (vid, ev) in loc.ec_volumes() {
|
||||
let shard_count = ev.shard_count();
|
||||
if shard_count >= DATA_SHARDS_COUNT {
|
||||
continue;
|
||||
}
|
||||
let key = EcKey {
|
||||
collection: ev.collection.clone(),
|
||||
vid: *vid,
|
||||
};
|
||||
let Some(owner) = dat_owners.get(&key) else {
|
||||
continue;
|
||||
};
|
||||
if owner.location == loc_idx {
|
||||
// Same-disk .dat is the job of
|
||||
// DiskLocation::validate_ec_volume during the
|
||||
// per-disk pass; don't second-guess it here.
|
||||
continue;
|
||||
}
|
||||
// Decide whether the sibling .dat is credible. Prefer
|
||||
// the size baked into .vif at encode time; fall back
|
||||
// to "at least a superblock" for old EC volumes whose
|
||||
// .vif predates the field.
|
||||
let required = if ev.dat_file_size > 0 {
|
||||
ev.dat_file_size as u64
|
||||
} else {
|
||||
SUPER_BLOCK_SIZE as u64
|
||||
};
|
||||
if owner.size < required {
|
||||
warn!(
|
||||
volume_id = vid.0,
|
||||
collection = %ev.collection,
|
||||
directory = %loc.directory,
|
||||
shard_count,
|
||||
sibling_dir = %self.locations[owner.location].directory,
|
||||
sibling_dat_size = owner.size,
|
||||
required,
|
||||
"sibling .dat is smaller than the EC source size; leaving partial EC in place so distributed reconstruction is still possible (issue 9478)",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
victims.push(Victim {
|
||||
loc_idx,
|
||||
collection: ev.collection.clone(),
|
||||
vid: *vid,
|
||||
dat_dir: self.locations[owner.location].directory.clone(),
|
||||
shard_count,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
for v in victims {
|
||||
warn!(
|
||||
volume_id = v.vid.0,
|
||||
collection = %v.collection,
|
||||
directory = %self.locations[v.loc_idx].directory,
|
||||
shard_count = v.shard_count,
|
||||
required = DATA_SHARDS_COUNT,
|
||||
dat_dir = %v.dat_dir,
|
||||
"partial EC shards on this disk while a healthy .dat exists on a sibling disk; cleaning up leftover EC files (issue 9478)",
|
||||
);
|
||||
let loc = &mut self.locations[v.loc_idx];
|
||||
if let Some(mut ec_vol) = loc.remove_ec_volume(v.vid) {
|
||||
for _ in 0..ec_vol.shard_count() {
|
||||
crate::metrics::VOLUME_GAUGE
|
||||
.with_label_values(&[&ec_vol.collection, "ec_shards"])
|
||||
.dec();
|
||||
}
|
||||
// destroy() closes shard file handles before unlinking
|
||||
// (matters on Windows) and removes .ecx / .ecj / .vif.
|
||||
ec_vol.destroy();
|
||||
}
|
||||
// Also sweep any unmounted shard files (.ec00 .. .ec31)
|
||||
// that the per-disk loader skipped — destroy() only walks
|
||||
// the in-memory shards, but the disk may still hold others.
|
||||
loc.remove_ec_volume_files(&v.collection, v.vid);
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns, for every `(collection, vid)`, the index of the first
|
||||
/// disk on this store that holds a `.dat` for it plus the file's
|
||||
/// size. Used by `prune_incomplete_ec_with_sibling_dat` so it can
|
||||
/// decide whether partial EC artefacts on another disk are
|
||||
/// leftovers of an interrupted encode AND whether the sibling
|
||||
/// `.dat` is large enough to be a credible fallback.
|
||||
///
|
||||
/// Any `.dat` `read_dir` can see is recorded — including zero-byte
|
||||
/// shells. Their mere presence means this volume was a regular
|
||||
/// volume on this server at some point, which rules out the
|
||||
/// "distributed EC, no .dat anywhere" reading. Whether the file is
|
||||
/// actually usable is the caller's call, made by comparing this
|
||||
/// size to the EC's recorded source size in `.vif`.
|
||||
fn index_dat_owners(&self) -> HashMap<EcKey, DatOwnerInfo> {
|
||||
let mut owners: HashMap<EcKey, DatOwnerInfo> = HashMap::new();
|
||||
for (loc_idx, loc) in self.locations.iter().enumerate() {
|
||||
let Ok(read) = fs::read_dir(&loc.directory) else {
|
||||
continue;
|
||||
};
|
||||
for ent in read.flatten() {
|
||||
if ent.file_type().map(|ft| ft.is_dir()).unwrap_or(false) {
|
||||
continue;
|
||||
}
|
||||
let name = ent.file_name().to_string_lossy().into_owned();
|
||||
let Some(base) = name.strip_suffix(".dat") else {
|
||||
continue;
|
||||
};
|
||||
let Some((collection, vid)) = parse_collection_volume_id_pub(base) else {
|
||||
continue;
|
||||
};
|
||||
let Ok(meta) = ent.metadata() else {
|
||||
continue;
|
||||
};
|
||||
owners
|
||||
.entry(EcKey { collection, vid })
|
||||
.or_insert(DatOwnerInfo {
|
||||
location: loc_idx,
|
||||
size: meta.len(),
|
||||
});
|
||||
}
|
||||
}
|
||||
owners
|
||||
}
|
||||
|
||||
/// Build a `(collection, vid) -> EcxOwnerInfo` map of which disk
|
||||
/// owns the `.ecx` file. `.ecx` normally lives in `IdxDirectory`
|
||||
/// but may have been written into the data directory before
|
||||
@@ -725,6 +919,249 @@ mod tests {
|
||||
assert!(ev.has_shard(0));
|
||||
assert!(ev.has_shard(1));
|
||||
}
|
||||
|
||||
/// Reproduces the issue 9478 layout for a single volume server:
|
||||
/// disk A holds a healthy `.dat` for vid 122; disk B holds a single
|
||||
/// `.ec01` plus `.ecx` / `.ecj` / `.vif` and no `.dat`. The per-disk
|
||||
/// loader mounts the partial shard on disk B as if it were a
|
||||
/// distributed-EC layout. The Store-level prune must unmount and
|
||||
/// delete the leftover EC files on disk B while leaving disk A's
|
||||
/// `.dat` untouched.
|
||||
#[test]
|
||||
fn test_prune_drops_partial_ec_when_sibling_disk_has_dat() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let dat_dir = tmp.path().join("sdd");
|
||||
let ec_dir = tmp.path().join("sdf");
|
||||
std::fs::create_dir_all(&dat_dir).unwrap();
|
||||
std::fs::create_dir_all(&ec_dir).unwrap();
|
||||
|
||||
let collection = "";
|
||||
let vid = 122u32;
|
||||
|
||||
// Disk A (sdd): a non-trivial .dat plus the regular-volume
|
||||
// sidecars. The .dat content doesn't matter for the prune —
|
||||
// load_existing_volumes refuses to mount it because the
|
||||
// superblock is absent — but its file name has to be present
|
||||
// so index_dat_owners records this disk as the .dat owner.
|
||||
let dat_path = dat_dir.join(format!("{}_{}.dat", collection, vid).trim_start_matches('_'));
|
||||
std::fs::write(&dat_path, vec![0u8; 1024]).unwrap();
|
||||
|
||||
// Disk B (sdf): partial EC — one shard, plus .ecx / .ecj / .vif.
|
||||
write_shard(ec_dir.to_str().unwrap(), collection, vid, 1);
|
||||
write_index_files(ec_dir.to_str().unwrap(), collection, vid, 10, 4);
|
||||
|
||||
let mut store = Store::new(NeedleMapKind::InMemory);
|
||||
store
|
||||
.add_location(
|
||||
dat_dir.to_str().unwrap(),
|
||||
dat_dir.to_str().unwrap(),
|
||||
100,
|
||||
DiskType::HardDrive,
|
||||
MinFreeSpace::Percent(0.0),
|
||||
Vec::new(),
|
||||
)
|
||||
.unwrap();
|
||||
store
|
||||
.add_location(
|
||||
ec_dir.to_str().unwrap(),
|
||||
ec_dir.to_str().unwrap(),
|
||||
100,
|
||||
DiskType::HardDrive,
|
||||
MinFreeSpace::Percent(0.0),
|
||||
Vec::new(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// After add_location for sdf, the per-disk loader has mounted
|
||||
// the partial shard AND the prune has run as part of
|
||||
// add_location's epilogue. Confirm the prune removed it.
|
||||
let ec_loc = &store.locations[1];
|
||||
assert!(
|
||||
ec_loc.find_ec_volume(VolumeId(vid)).is_none(),
|
||||
"partial EC volume should have been unmounted by the prune",
|
||||
);
|
||||
|
||||
let ec_base = ec_dir
|
||||
.join(format!("{}_{}", collection, vid).trim_start_matches('_'))
|
||||
.to_string_lossy()
|
||||
.into_owned();
|
||||
assert!(
|
||||
!std::path::Path::new(&format!("{}.ec01", ec_base)).exists(),
|
||||
"partial shard file should have been removed",
|
||||
);
|
||||
assert!(
|
||||
!std::path::Path::new(&format!("{}.ecx", ec_base)).exists(),
|
||||
".ecx should have been removed",
|
||||
);
|
||||
assert!(
|
||||
!std::path::Path::new(&format!("{}.ecj", ec_base)).exists(),
|
||||
".ecj should have been removed",
|
||||
);
|
||||
|
||||
// The .dat on the sibling disk must survive — pruning the
|
||||
// partial EC must never touch the healthy replica.
|
||||
assert!(
|
||||
dat_path.exists(),
|
||||
"healthy .dat on sibling disk should be left alone",
|
||||
);
|
||||
}
|
||||
|
||||
/// Safety guard for `prune_incomplete_ec_with_sibling_dat`: when
|
||||
/// the sibling `.dat` is smaller than the source size recorded in
|
||||
/// the EC's `.vif`, the `.dat` is clearly truncated and is not a
|
||||
/// credible fallback. The partial shard may still combine with
|
||||
/// shards on other servers in a recoverable distributed-EC layout,
|
||||
/// so we leave the EC files in place.
|
||||
#[test]
|
||||
fn test_prune_keeps_partial_ec_when_sibling_dat_is_smaller_than_vif_source_size() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let dat_dir = tmp.path().join("sdd");
|
||||
let ec_dir = tmp.path().join("sdf");
|
||||
std::fs::create_dir_all(&dat_dir).unwrap();
|
||||
std::fs::create_dir_all(&ec_dir).unwrap();
|
||||
|
||||
let collection = "logs";
|
||||
let vid = 122u32;
|
||||
|
||||
// Tiny but loadable .dat on sdd: a valid 8-byte version-3
|
||||
// superblock so Volume::new succeeds, padded out to a few
|
||||
// hundred bytes so we have something well below the .vif-
|
||||
// recorded source size below. Anything smaller would let
|
||||
// Volume::new's auto-write extend the file to SUPER_BLOCK_SIZE
|
||||
// and obscure the truncation we're trying to model.
|
||||
let dat_path = dat_dir.join(format!("{}_{}.dat", collection, vid));
|
||||
let mut dat_bytes = crate::storage::super_block::SuperBlock::default().to_bytes();
|
||||
dat_bytes.resize(256, 0u8);
|
||||
std::fs::write(&dat_path, &dat_bytes).unwrap();
|
||||
let sibling_dat_size = dat_bytes.len() as u64;
|
||||
let source_size = 10 * 1024 * 1024u64; // 10 MiB recorded in .vif
|
||||
assert!(sibling_dat_size < source_size);
|
||||
|
||||
// Partial EC: one shard plus .ecx / .ecj / .vif. The .vif
|
||||
// records the source .dat size at encode time; the safety
|
||||
// check compares the sibling .dat against this value.
|
||||
write_shard(ec_dir.to_str().unwrap(), collection, vid, 1);
|
||||
let vif = VifVolumeInfo {
|
||||
version: 3,
|
||||
dat_file_size: source_size as i64,
|
||||
ec_shard_config: Some(VifEcShardConfig {
|
||||
data_shards: 10,
|
||||
parity_shards: 4,
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
std::fs::write(
|
||||
ec_dir.join(format!("{}_{}.vif", collection, vid)),
|
||||
serde_json::to_string(&vif).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
std::fs::write(
|
||||
ec_dir.join(format!("{}_{}.ecx", collection, vid)),
|
||||
vec![0u8; 20],
|
||||
)
|
||||
.unwrap();
|
||||
std::fs::write(
|
||||
ec_dir.join(format!("{}_{}.ecj", collection, vid)),
|
||||
b"",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let mut store = Store::new(NeedleMapKind::InMemory);
|
||||
store
|
||||
.add_location(
|
||||
dat_dir.to_str().unwrap(),
|
||||
dat_dir.to_str().unwrap(),
|
||||
100,
|
||||
DiskType::HardDrive,
|
||||
MinFreeSpace::Percent(0.0),
|
||||
Vec::new(),
|
||||
)
|
||||
.unwrap();
|
||||
store
|
||||
.add_location(
|
||||
ec_dir.to_str().unwrap(),
|
||||
ec_dir.to_str().unwrap(),
|
||||
100,
|
||||
DiskType::HardDrive,
|
||||
MinFreeSpace::Percent(0.0),
|
||||
Vec::new(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Prune ran as part of add_location's epilogue. Confirm the
|
||||
// partial EC was left alone because the sibling .dat is too
|
||||
// small to be the encoding source.
|
||||
let ec_loc = &store.locations[1];
|
||||
let ev = ec_loc
|
||||
.find_ec_volume(VolumeId(vid))
|
||||
.expect("partial EC volume must remain mounted when the sibling .dat is smaller than the .vif source size");
|
||||
assert_eq!(ev.shard_count(), 1);
|
||||
|
||||
let ec_base = ec_dir
|
||||
.join(format!("{}_{}", collection, vid))
|
||||
.to_string_lossy()
|
||||
.into_owned();
|
||||
assert!(
|
||||
std::path::Path::new(&format!("{}.ec01", ec_base)).exists(),
|
||||
"partial shard file must survive when the sibling .dat is truncated",
|
||||
);
|
||||
assert!(
|
||||
std::path::Path::new(&format!("{}.ecx", ec_base)).exists(),
|
||||
".ecx must survive when the sibling .dat is truncated",
|
||||
);
|
||||
}
|
||||
|
||||
/// A distributed EC volume — no `.dat` on any disk of this store —
|
||||
/// must not be touched by the prune even when shard count is below
|
||||
/// `DATA_SHARDS_COUNT`. Partial shard layouts are normal for
|
||||
/// distributed EC.
|
||||
#[test]
|
||||
fn test_prune_leaves_distributed_ec_alone() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let dir0 = tmp.path().join("data0");
|
||||
let dir1 = tmp.path().join("data1");
|
||||
std::fs::create_dir_all(&dir0).unwrap();
|
||||
std::fs::create_dir_all(&dir1).unwrap();
|
||||
|
||||
let collection = "logs";
|
||||
let vid = 7u32;
|
||||
|
||||
// Two shards on dir0 with .ecx, no .dat anywhere.
|
||||
write_shard(dir0.to_str().unwrap(), collection, vid, 1);
|
||||
write_shard(dir0.to_str().unwrap(), collection, vid, 2);
|
||||
write_index_files(dir0.to_str().unwrap(), collection, vid, 10, 4);
|
||||
|
||||
let mut store = Store::new(NeedleMapKind::InMemory);
|
||||
for dir in [&dir0, &dir1] {
|
||||
store
|
||||
.add_location(
|
||||
dir.to_str().unwrap(),
|
||||
dir.to_str().unwrap(),
|
||||
100,
|
||||
DiskType::HardDrive,
|
||||
MinFreeSpace::Percent(0.0),
|
||||
Vec::new(),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let ev = store.locations[0]
|
||||
.find_ec_volume(VolumeId(vid))
|
||||
.expect("distributed EC volume should still be mounted on dir0");
|
||||
assert_eq!(
|
||||
ev.shard_count(),
|
||||
2,
|
||||
"no .dat anywhere on this store means the partial shards must be kept as distributed EC",
|
||||
);
|
||||
|
||||
let ec_base = dir0
|
||||
.join(format!("{}_{}", collection, vid))
|
||||
.to_string_lossy()
|
||||
.into_owned();
|
||||
assert!(std::path::Path::new(&format!("{}.ec01", ec_base)).exists());
|
||||
assert!(std::path::Path::new(&format!("{}.ec02", ec_base)).exists());
|
||||
assert!(std::path::Path::new(&format!("{}.ecx", ec_base)).exists());
|
||||
}
|
||||
}
|
||||
|
||||
/// Walk a disk's data directory and return the `.ec??` shard files
|
||||
|
||||
@@ -288,6 +288,15 @@ func (ev *EcVolume) ShardSize() uint64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// DatFileSize returns the source .dat file size as recorded in .vif at
|
||||
// EC encoding time. Zero for old EC volumes whose .vif predates the
|
||||
// field, or for .vif files we failed to parse. Used by the Store-level
|
||||
// prune in store_ec_reconcile.go to validate that a sibling-disk .dat
|
||||
// is plausibly the encoding source before deleting the partial EC.
|
||||
func (ev *EcVolume) DatFileSize() int64 {
|
||||
return ev.datFileSize
|
||||
}
|
||||
|
||||
func (ev *EcVolume) Size() (size uint64) {
|
||||
for _, shard := range ev.Shards {
|
||||
if shardSize := shard.Size(); shardSize > 0 {
|
||||
|
||||
@@ -154,6 +154,16 @@ func NewStore(
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// First, scrub partial EC artefacts left on one disk by an interrupted
|
||||
// encode while the source .dat still lives on a sibling disk of the
|
||||
// same store. The per-disk loader cannot see the sibling .dat and so
|
||||
// loads the partial shards as if they were a distributed-EC layout,
|
||||
// which makes the volume server heartbeat both a regular replica and
|
||||
// an EC shard set for the same vid (issue #9478). Running before the
|
||||
// cross-disk reconcile keeps that pass from later re-loading shards
|
||||
// we just cleaned up.
|
||||
s.pruneIncompleteEcWithSiblingDat()
|
||||
|
||||
// After every DiskLocation has finished its per-disk EC scan, sweep the
|
||||
// store for shards that live on a disk without local index files and
|
||||
// load them by reaching across to a sibling disk's .ecx / .ecj / .vif.
|
||||
|
||||
257
weed/storage/store_ec_hybrid_repro_test.go
Normal file
257
weed/storage/store_ec_hybrid_repro_test.go
Normal file
@@ -0,0 +1,257 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
// TestIssue9478_PartialEcOnSiblingDiskOfHealthyDat reproduces
|
||||
// https://github.com/seaweedfs/seaweedfs/issues/9478:
|
||||
//
|
||||
// - A single volume server has two disks. Disk A holds a healthy
|
||||
// volume_x.dat (with .idx + .vif). Disk B holds the leftovers of an
|
||||
// interrupted EC encode for the SAME volume: a single .ec?? shard plus
|
||||
// .ecx + .ecj + .vif, with no .dat next to them.
|
||||
//
|
||||
// The current per-disk EC loader (handleFoundEcxFile / validateEcVolume) only
|
||||
// checks for .dat in the same DiskLocation as the EC shards. Because Disk B
|
||||
// has no .dat, it concludes "this must be a distributed EC volume" and
|
||||
// happily loads the lone partial shard. The .dat on Disk A also loads as a
|
||||
// regular replica. The volume server then heartbeats BOTH a regular volume
|
||||
// and an EC shard for the same vid, which is the inconsistent state the
|
||||
// issue is describing.
|
||||
//
|
||||
// The expected behaviour is to recognise that a healthy .dat exists on a
|
||||
// sibling disk on the same store and clean up the partial EC files on
|
||||
// Disk B, exactly as we already do today when .dat and the partial EC files
|
||||
// sit on the same disk.
|
||||
func TestIssue9478_PartialEcOnSiblingDiskOfHealthyDat(t *testing.T) {
|
||||
collection := ""
|
||||
vid := needle.VolumeId(122)
|
||||
|
||||
root := t.TempDir()
|
||||
datDir := root + "/sdd"
|
||||
ecDir := root + "/sdf"
|
||||
if err := os.MkdirAll(datDir, 0o755); err != nil {
|
||||
t.Fatalf("mkdir datDir: %v", err)
|
||||
}
|
||||
if err := os.MkdirAll(ecDir, 0o755); err != nil {
|
||||
t.Fatalf("mkdir ecDir: %v", err)
|
||||
}
|
||||
|
||||
// Disk A (sdd): a healthy-looking volume_x.dat + .idx + .vif. We don't
|
||||
// run NewVolume here, only check what loadAllEcShards on Disk B does in
|
||||
// the presence of a same-server .dat. A truncated 10 MiB .dat is enough
|
||||
// for the EC validator to compute an expected shard size from.
|
||||
datFileSize := int64(10 * 1024 * 1024)
|
||||
datBase := erasure_coding.EcShardFileName(collection, datDir, int(vid))
|
||||
if f, err := os.Create(datBase + ".dat"); err == nil {
|
||||
if err := f.Truncate(datFileSize); err != nil {
|
||||
t.Fatalf("truncate dat: %v", err)
|
||||
}
|
||||
f.Close()
|
||||
} else {
|
||||
t.Fatalf("create dat: %v", err)
|
||||
}
|
||||
if f, err := os.Create(datBase + ".idx"); err == nil {
|
||||
f.Close()
|
||||
}
|
||||
if f, err := os.Create(datBase + ".vif"); err == nil {
|
||||
f.Close()
|
||||
}
|
||||
|
||||
// Disk B (sdf): only one EC shard plus .ecx + .ecj + .vif, no .dat.
|
||||
// This mirrors the issue 9478 listing for volume_server_4 / sdf.
|
||||
ecBase := erasure_coding.EcShardFileName(collection, ecDir, int(vid))
|
||||
expectedShardSize := calculateExpectedShardSize(datFileSize, erasure_coding.DataShardsCount)
|
||||
if f, err := os.Create(ecBase + erasure_coding.ToExt(1)); err == nil {
|
||||
if err := f.Truncate(expectedShardSize); err != nil {
|
||||
t.Fatalf("truncate shard: %v", err)
|
||||
}
|
||||
f.Close()
|
||||
} else {
|
||||
t.Fatalf("create shard: %v", err)
|
||||
}
|
||||
if f, err := os.Create(ecBase + ".ecx"); err == nil {
|
||||
f.WriteString("dummy ecx")
|
||||
f.Close()
|
||||
}
|
||||
if f, err := os.Create(ecBase + ".ecj"); err == nil {
|
||||
f.Close()
|
||||
}
|
||||
if f, err := os.Create(ecBase + ".vif"); err == nil {
|
||||
f.Close()
|
||||
}
|
||||
|
||||
minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"}
|
||||
makeDisk := func(dir string) *DiskLocation {
|
||||
dl := &DiskLocation{
|
||||
Directory: dir,
|
||||
DirectoryUuid: "test-uuid-" + dir,
|
||||
IdxDirectory: dir,
|
||||
DiskType: types.HddType,
|
||||
MinFreeSpace: minFreeSpace,
|
||||
}
|
||||
dl.volumes = make(map[needle.VolumeId]*Volume)
|
||||
dl.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume)
|
||||
return dl
|
||||
}
|
||||
datLoc := makeDisk(datDir)
|
||||
ecLoc := makeDisk(ecDir)
|
||||
|
||||
// Stand the disks up the same way NewStore does: per-disk EC scan
|
||||
// first, then the Store-level passes. The per-disk pass alone cannot
|
||||
// see the .dat on the sibling disk, so it mounts the partial shards;
|
||||
// the Store-level prune is what brings the cluster back to a
|
||||
// consistent state.
|
||||
store := &Store{
|
||||
Locations: []*DiskLocation{datLoc, ecLoc},
|
||||
NewEcShardsChan: make(chan master_pb.VolumeEcShardInformationMessage, 16),
|
||||
DeletedEcShardsChan: make(chan master_pb.VolumeEcShardInformationMessage, 16),
|
||||
}
|
||||
ecLoc.ecShardNotifyHandler = func(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume) {
|
||||
store.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{
|
||||
Id: uint32(vid),
|
||||
Collection: collection,
|
||||
}
|
||||
}
|
||||
|
||||
if err := ecLoc.loadAllEcShards(ecLoc.ecShardNotifyHandler); err != nil {
|
||||
t.Logf("loadAllEcShards on ecDir returned: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { closeEcVolumes(ecLoc) })
|
||||
|
||||
preShardCount := ecLoc.EcShardCount()
|
||||
preNewMessages := len(store.NewEcShardsChan)
|
||||
t.Logf("after per-disk EC load on sdf: inMemoryShards=%d newEcShardMessages=%d", preShardCount, preNewMessages)
|
||||
if preShardCount == 0 {
|
||||
t.Fatalf("test setup is no longer reproducing the bug: per-disk EC load did not mount the partial shard")
|
||||
}
|
||||
|
||||
store.pruneIncompleteEcWithSiblingDat()
|
||||
store.reconcileEcShardsAcrossDisks()
|
||||
|
||||
leftoverShard := util.FileExists(ecBase + erasure_coding.ToExt(1))
|
||||
leftoverEcx := util.FileExists(ecBase + ".ecx")
|
||||
leftoverEcj := util.FileExists(ecBase + ".ecj")
|
||||
loaded := ecLoc.EcShardCount()
|
||||
deleteMessages := len(store.DeletedEcShardsChan)
|
||||
|
||||
t.Logf("after Store-level cleanup: shardFileLeft=%v ecxLeft=%v ecjLeft=%v inMemoryShards=%d deletedEcShardMessages=%d",
|
||||
leftoverShard, leftoverEcx, leftoverEcj, loaded, deleteMessages)
|
||||
|
||||
if loaded != 0 {
|
||||
t.Fatalf("partial EC shard was still mounted (%d shards) after the Store-level prune; a healthy .dat exists on a sibling disk and the leftover should have been cleaned up", loaded)
|
||||
}
|
||||
if leftoverShard || leftoverEcx || leftoverEcj {
|
||||
t.Fatalf("partial EC files survived the Store-level prune (shard=%v ecx=%v ecj=%v); a healthy .dat exists on a sibling disk and the leftover should have been removed",
|
||||
leftoverShard, leftoverEcx, leftoverEcj)
|
||||
}
|
||||
if deleteMessages == 0 {
|
||||
t.Errorf("prune did not push DeletedEcShardsChan; master would have to wait for the next periodic heartbeat to forget the partial shard")
|
||||
}
|
||||
|
||||
// The .dat on the sibling disk must survive — pruning the partial EC
|
||||
// must never touch the healthy replica we are falling back to.
|
||||
if !util.FileExists(datBase + ".dat") {
|
||||
t.Fatalf("healthy .dat on sibling disk was removed by the prune; only the partial EC files should be cleaned up")
|
||||
}
|
||||
}
|
||||
|
||||
// TestIssue9478_ZeroByteSiblingDatKeepsPartialEc is the safety guard for
|
||||
// pruneIncompleteEcWithSiblingDat: when the sibling .dat is zero bytes
|
||||
// (or smaller than what .vif recorded as the encoding source), we'd
|
||||
// rather keep the partial EC than delete it based on garbage. The
|
||||
// partial shard may still combine with shards on other servers in a
|
||||
// recoverable distributed-EC layout.
|
||||
func TestIssue9478_ZeroByteSiblingDatKeepsPartialEc(t *testing.T) {
|
||||
collection := ""
|
||||
vid := needle.VolumeId(122)
|
||||
|
||||
root := t.TempDir()
|
||||
datDir := root + "/sdd"
|
||||
ecDir := root + "/sdf"
|
||||
if err := os.MkdirAll(datDir, 0o755); err != nil {
|
||||
t.Fatalf("mkdir datDir: %v", err)
|
||||
}
|
||||
if err := os.MkdirAll(ecDir, 0o755); err != nil {
|
||||
t.Fatalf("mkdir ecDir: %v", err)
|
||||
}
|
||||
|
||||
// Sibling .dat is a zero-byte shell — the kind volume servers 3
|
||||
// and 5 in the issue 9478 report have.
|
||||
datBase := erasure_coding.EcShardFileName(collection, datDir, int(vid))
|
||||
if f, err := os.Create(datBase + ".dat"); err == nil {
|
||||
f.Close()
|
||||
} else {
|
||||
t.Fatalf("create zero-byte dat: %v", err)
|
||||
}
|
||||
|
||||
ecBase := erasure_coding.EcShardFileName(collection, ecDir, int(vid))
|
||||
datFileSizeForShard := int64(10 * 1024 * 1024)
|
||||
expectedShardSize := calculateExpectedShardSize(datFileSizeForShard, erasure_coding.DataShardsCount)
|
||||
if f, err := os.Create(ecBase + erasure_coding.ToExt(1)); err == nil {
|
||||
if err := f.Truncate(expectedShardSize); err != nil {
|
||||
t.Fatalf("truncate shard: %v", err)
|
||||
}
|
||||
f.Close()
|
||||
}
|
||||
if f, err := os.Create(ecBase + ".ecx"); err == nil {
|
||||
f.WriteString("dummy ecx")
|
||||
f.Close()
|
||||
}
|
||||
if f, err := os.Create(ecBase + ".ecj"); err == nil {
|
||||
f.Close()
|
||||
}
|
||||
if f, err := os.Create(ecBase + ".vif"); err == nil {
|
||||
f.Close()
|
||||
}
|
||||
|
||||
minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"}
|
||||
makeDisk := func(dir string) *DiskLocation {
|
||||
dl := &DiskLocation{
|
||||
Directory: dir,
|
||||
DirectoryUuid: "test-uuid-" + dir,
|
||||
IdxDirectory: dir,
|
||||
DiskType: types.HddType,
|
||||
MinFreeSpace: minFreeSpace,
|
||||
}
|
||||
dl.volumes = make(map[needle.VolumeId]*Volume)
|
||||
dl.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume)
|
||||
return dl
|
||||
}
|
||||
datLoc := makeDisk(datDir)
|
||||
ecLoc := makeDisk(ecDir)
|
||||
|
||||
store := &Store{
|
||||
Locations: []*DiskLocation{datLoc, ecLoc},
|
||||
NewEcShardsChan: make(chan master_pb.VolumeEcShardInformationMessage, 16),
|
||||
DeletedEcShardsChan: make(chan master_pb.VolumeEcShardInformationMessage, 16),
|
||||
}
|
||||
|
||||
if err := ecLoc.loadAllEcShards(nil); err != nil {
|
||||
t.Logf("loadAllEcShards on ecDir returned: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { closeEcVolumes(ecLoc) })
|
||||
if ecLoc.EcShardCount() == 0 {
|
||||
t.Fatalf("test setup is not reproducing the layout: per-disk EC load did not mount the partial shard")
|
||||
}
|
||||
|
||||
store.pruneIncompleteEcWithSiblingDat()
|
||||
|
||||
if ecLoc.EcShardCount() == 0 {
|
||||
t.Fatalf("prune deleted partial EC based on a zero-byte sibling .dat; the shard should have been left in place so distributed reconstruction is still possible")
|
||||
}
|
||||
if !util.FileExists(ecBase + erasure_coding.ToExt(1)) {
|
||||
t.Fatalf("prune deleted the partial shard file based on a zero-byte sibling .dat")
|
||||
}
|
||||
if len(store.DeletedEcShardsChan) != 0 {
|
||||
t.Errorf("prune pushed a DeletedEcShardsChan message despite skipping cleanup; nothing was actually deleted")
|
||||
}
|
||||
}
|
||||
@@ -7,10 +7,22 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
||||
)
|
||||
|
||||
// datOwnerInfo records both the disk that holds a .dat for a given
|
||||
// (collection, vid) and the size on disk. The size is consulted by
|
||||
// pruneIncompleteEcWithSiblingDat before deleting any EC artefacts:
|
||||
// a zero-byte or truncated .dat is not a credible fallback, and we'd
|
||||
// rather leave the partial EC in place than wipe it based on garbage.
|
||||
type datOwnerInfo struct {
|
||||
location *DiskLocation
|
||||
size int64
|
||||
}
|
||||
|
||||
// ecKeyForReconcile keys orphan-shard reconciliation by collection + volume
|
||||
// id. Per-collection grouping matters because two collections can re-use the
|
||||
// same volume id, and we must only pair shards with their own .ecx file.
|
||||
@@ -125,6 +137,160 @@ func (s *Store) indexEcxOwners() map[ecKeyForReconcile]ecxOwnerInfo {
|
||||
return owners
|
||||
}
|
||||
|
||||
// pruneIncompleteEcWithSiblingDat removes leftover EC artefacts on one
|
||||
// disk when a healthy .dat for the same (collection, vid) lives on a
|
||||
// sibling disk of the same store. This is the cross-disk analogue of the
|
||||
// validateEcVolume cleanup in handleFoundEcxFile: a same-disk .dat next
|
||||
// to partial shards is already taken as proof that an EC encode was
|
||||
// interrupted, and the partial shards get removed so the .dat keeps
|
||||
// serving the volume. Per-disk loaders cannot see sibling disks, so when
|
||||
// the .dat ends up on disk A and the partial shards on disk B the per-disk
|
||||
// pass mistakes the leftover for a normal distributed-EC layout (no .dat
|
||||
// next to .ecx) and mounts the partial shards. The volume server then
|
||||
// heartbeats both a regular replica and an EC shard for the same vid, the
|
||||
// master keeps both entries, and reads route through either path
|
||||
// depending on the client. Issue 9478.
|
||||
//
|
||||
// Cleanup is gated on shardCount < DataShardsCount so that a deliberate
|
||||
// "full local EC, .dat retained" layout split across two disks (.dat on
|
||||
// disk A, all 10+ shards on disk B) is left alone — the per-disk loader
|
||||
// already keeps that configuration when everything is on a single disk,
|
||||
// and pruning it here would be a behaviour regression for operators who
|
||||
// rely on it. Distributed EC volumes (no .dat on any disk of this server)
|
||||
// also fall through unchanged because the lookup in the .dat index below
|
||||
// will simply not find a match.
|
||||
//
|
||||
// Before deleting any EC files we also check that the sibling .dat is
|
||||
// plausibly the encoding source: at least super_block.SuperBlockSize
|
||||
// bytes long, and — when the EC's .vif recorded a non-zero source size
|
||||
// in datFileSize — at least that many bytes. A zero-byte shell or a
|
||||
// truncated .dat does not justify wiping the partial EC, because that
|
||||
// EC shard may still combine usefully with shards on other servers in
|
||||
// a recoverable distributed-EC layout.
|
||||
//
|
||||
// We push DeletedEcShardsChan for every pruned shard so the master is told
|
||||
// to forget the registrations the per-disk pass already emitted on
|
||||
// NewEcShardsChan during startup, instead of waiting for the first
|
||||
// periodic heartbeat to reconcile.
|
||||
func (s *Store) pruneIncompleteEcWithSiblingDat() {
|
||||
if len(s.Locations) < 2 {
|
||||
return
|
||||
}
|
||||
|
||||
datOwners := s.indexDatOwners()
|
||||
if len(datOwners) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for diskId, loc := range s.Locations {
|
||||
// Snapshot under the read lock so we are not iterating
|
||||
// ecVolumes while the cleanup below takes the write lock.
|
||||
type victim struct {
|
||||
collection string
|
||||
vid needle.VolumeId
|
||||
messages []*master_pb.VolumeEcShardInformationMessage
|
||||
datDir string
|
||||
shardCount int
|
||||
}
|
||||
var victims []victim
|
||||
loc.ecVolumesLock.RLock()
|
||||
for vid, ev := range loc.ecVolumes {
|
||||
shardCount := len(ev.Shards)
|
||||
if shardCount >= erasure_coding.DataShardsCount {
|
||||
continue
|
||||
}
|
||||
key := ecKeyForReconcile{collection: ev.Collection, vid: vid}
|
||||
owner, hasDat := datOwners[key]
|
||||
if !hasDat || owner.location == loc {
|
||||
continue
|
||||
}
|
||||
// Decide whether the sibling .dat is a credible source.
|
||||
// Prefer the size baked into .vif at encode time; fall
|
||||
// back to "at least a superblock" for old EC volumes
|
||||
// whose .vif predates the field.
|
||||
requiredDatSize := ev.DatFileSize()
|
||||
if requiredDatSize <= 0 {
|
||||
requiredDatSize = int64(super_block.SuperBlockSize)
|
||||
}
|
||||
if owner.size < requiredDatSize {
|
||||
glog.Warningf("ec volume %d (collection=%q) on %s has only %d shards but sibling .dat on %s is %d bytes (need >= %d); leaving partial EC in place so distributed reconstruction is still possible",
|
||||
vid, ev.Collection, loc.Directory, shardCount, owner.location.Directory, owner.size, requiredDatSize)
|
||||
continue
|
||||
}
|
||||
victims = append(victims, victim{
|
||||
collection: ev.Collection,
|
||||
vid: vid,
|
||||
messages: ev.ToVolumeEcShardInformationMessage(uint32(diskId)),
|
||||
datDir: owner.location.Directory,
|
||||
shardCount: shardCount,
|
||||
})
|
||||
}
|
||||
loc.ecVolumesLock.RUnlock()
|
||||
|
||||
for _, v := range victims {
|
||||
glog.Warningf("ec volume %d (collection=%q) on %s has only %d shards (need %d) while a healthy .dat exists on sibling disk %s; cleaning up leftover EC files (issue 9478)",
|
||||
v.vid, v.collection, loc.Directory, v.shardCount, erasure_coding.DataShardsCount, v.datDir)
|
||||
loc.unloadEcVolume(v.vid)
|
||||
loc.removeEcVolumeFiles(v.collection, v.vid)
|
||||
for _, msg := range v.messages {
|
||||
select {
|
||||
case s.DeletedEcShardsChan <- *msg:
|
||||
default:
|
||||
// Channel full during startup is fine — the next
|
||||
// periodic heartbeat reports the full ecVolumes
|
||||
// state, which no longer contains these shards.
|
||||
glog.V(2).Infof("DeletedEcShardsChan full while pruning ec volume %d; relying on periodic heartbeat", v.vid)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// indexDatOwners returns, for every (collection, vid), the first disk on
|
||||
// this store that holds a .dat file for it plus the file's size. Used by
|
||||
// pruneIncompleteEcWithSiblingDat so it can decide whether partial EC
|
||||
// artefacts on another disk are leftovers of an interrupted encode AND
|
||||
// whether the sibling .dat is large enough to be a credible fallback.
|
||||
//
|
||||
// We record any .dat os.ReadDir can see — including zero-byte shells.
|
||||
// The mere presence of a .dat means this volume was a regular volume on
|
||||
// this server at some point, which rules out the "distributed EC, no
|
||||
// .dat anywhere" reading. Whether that .dat is actually usable is the
|
||||
// caller's call, made by comparing this size to the EC's recorded
|
||||
// source size in .vif.
|
||||
func (s *Store) indexDatOwners() map[ecKeyForReconcile]datOwnerInfo {
|
||||
owners := make(map[ecKeyForReconcile]datOwnerInfo)
|
||||
for _, loc := range s.Locations {
|
||||
entries, err := os.ReadDir(loc.Directory)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() {
|
||||
continue
|
||||
}
|
||||
name := entry.Name()
|
||||
if !strings.HasSuffix(name, ".dat") {
|
||||
continue
|
||||
}
|
||||
base := name[:len(name)-len(".dat")]
|
||||
collection, vid, err := parseCollectionVolumeId(base)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
info, err := entry.Info()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
key := ecKeyForReconcile{collection: collection, vid: vid}
|
||||
if _, exists := owners[key]; !exists {
|
||||
owners[key] = datOwnerInfo{location: loc, size: info.Size()}
|
||||
}
|
||||
}
|
||||
}
|
||||
return owners
|
||||
}
|
||||
|
||||
// collectOrphanEcShards walks the disk's data directory and returns the
|
||||
// .ec?? shard files that are present on disk but not yet registered to an
|
||||
// EcVolume in memory. The map is keyed by (collection, vid) so callers can
|
||||
|
||||
Reference in New Issue
Block a user