From de28c4df613ff522803152e5abc56b98114450ce Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 13 May 2026 09:25:10 -0700 Subject: [PATCH] fix(storage): prune partial EC shards when sibling disk has healthy .dat (#9478) (#9480) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(storage): prune partial EC shards when sibling disk has healthy .dat (#9478) handleFoundEcxFile only checks for .dat in the same disk location as the EC shards. In a multi-disk volume server an interrupted encode can leave .ec?? + .ecx on disk B while the source .dat still lives on disk A: the per-disk loader sees no .dat next to .ecx, mistakes the leftover for a distributed-EC layout, and mounts the partial shards. The volume server then heartbeats both a regular replica and an EC shard for the same vid and the master keeps both. Sweep the store after per-disk loading and before the cross-disk reconcile to delete partial EC files when a healthy .dat for the same (collection, vid) exists on a sibling disk. Push DeletedEcShardsChan for every pruned shard so master forgets the new-shard message the per-disk pass already emitted, instead of waiting for the next periodic heartbeat. * fix(seaweed-volume): mirror prune of partial EC with sibling .dat (#9478) Rust port of the same Store-level prune added to weed/storage. The per-disk EC loader in disk_location.rs only checks for .dat in the same disk as the EC shards, so an interrupted encode that leaves .ec?? + .ecx on disk B while the source .dat sits on disk A is mounted as if it were a distributed-EC layout. The volume server then heartbeats both a regular replica and an EC shard for the same vid. Sweep the store after per-disk loading and before the cross-disk reconcile, dropping in-memory EcVolumes with fewer than DATA_SHARDS_COUNT shards when a .dat for the same (collection, vid) exists on a sibling disk, and remove all on-disk EC artefacts for them. The Rust heartbeat path already diff-emits deletes from the next ec_volumes snapshot, so no explicit delete-channel push is needed here. Tests cover both the issue 9478 layout and a distributed-EC layout with no .dat anywhere on the store, which must be left alone. * fix(storage): validate sibling .dat size before deleting partial EC (#9478) The earlier prune deleted partial EC files whenever any .dat for the same vid existed on a sibling disk — including a zero-byte shell. A shell is no more useful than the partial shard it would replace, and the partial shard might still combine with shards on other servers in a recoverable distributed-EC layout. Wiping it based on a corrupt sibling .dat is data loss masquerading as cleanup. Tighten the check: when the EC's .vif recorded a non-zero source size in datFileSize, require the sibling .dat to be at least that many bytes; otherwise fall back to "at least a superblock". The .vif value is what the encoder wrote at the moment the source was sealed, so a sibling .dat smaller than that is provably truncated. Carry the size through indexDatOwners alongside the location. The Rust port had the same gap and an additional bug behind it: EcVolume::new wasn't reading datFileSize from .vif, so the safety check always fell back to the superblock floor. Wire datFileSize through. The existing shard-size calculation in LocateEcShardNeedleInterval already uses dat_file_size when non-zero, so populating it also matches Go's behaviour there. Tests cover the truncated-sibling case in both ports. --- seaweed-volume/src/storage/disk_location.rs | 6 +- .../src/storage/erasure_coding/ec_volume.rs | 21 +- seaweed-volume/src/storage/store.rs | 12 + .../src/storage/store_ec_reconcile.rs | 437 ++++++++++++++++++ weed/storage/erasure_coding/ec_volume.go | 9 + weed/storage/store.go | 10 + weed/storage/store_ec_hybrid_repro_test.go | 257 ++++++++++ weed/storage/store_ec_reconcile.go | 166 +++++++ 8 files changed, 909 insertions(+), 9 deletions(-) create mode 100644 weed/storage/store_ec_hybrid_repro_test.go diff --git a/seaweed-volume/src/storage/disk_location.rs b/seaweed-volume/src/storage/disk_location.rs index 799f3c1ea..a7ed3f28a 100644 --- a/seaweed-volume/src/storage/disk_location.rs +++ b/seaweed-volume/src/storage/disk_location.rs @@ -304,7 +304,11 @@ impl DiskLocation { } /// Remove all EC-related files for a volume. - fn remove_ec_volume_files(&self, collection: &str, vid: VolumeId) { + /// `pub(crate)` so the Store-level cross-disk passes in + /// `store_ec_reconcile.rs` can call it to scrub partial EC artefacts + /// when a healthy `.dat` for the same vid lives on a sibling disk + /// (seaweedfs/seaweedfs#9478). + pub(crate) fn remove_ec_volume_files(&self, collection: &str, vid: VolumeId) { let base = volume_file_name(&self.directory, collection, vid); let idx_base = volume_file_name(&self.idx_directory, collection, vid); const MAX_SHARD_COUNT: usize = 32; diff --git a/seaweed-volume/src/storage/erasure_coding/ec_volume.rs b/seaweed-volume/src/storage/erasure_coding/ec_volume.rs index 3e0b68f7f..e070fe9e9 100644 --- a/seaweed-volume/src/storage/erasure_coding/ec_volume.rs +++ b/seaweed-volume/src/storage/erasure_coding/ec_volume.rs @@ -120,10 +120,15 @@ impl EcVolume { shards.push(None); } - // Read expire_at_sec and version from .vif if present (matches Go's MaybeLoadVolumeInfo). - // Prefer the data dir; fall back to the idx dir for the - // cross-disk reconcile case (#9212 / #9244). - let (expire_at_sec, vif_version) = { + // Read expire_at_sec, version, and dat_file_size from .vif if + // present (matches Go's MaybeLoadVolumeInfo). Prefer the data + // dir; fall back to the idx dir for the cross-disk reconcile + // case (#9212 / #9244). `dat_file_size` is the source .dat + // size at encode time, used both by `locate_ec_shard_needle` + // for shard-size math and by the Store-level prune in + // `store_ec_reconcile.rs` to verify a sibling-disk .dat is + // plausibly the encoding source (#9478). + let (expire_at_sec, vif_version, vif_dat_file_size) = { let vif_path = locate_vif_path(dir, dir_idx, collection, volume_id); if let Ok(vif_content) = std::fs::read_to_string(&vif_path) { if let Ok(vif_info) = @@ -134,12 +139,12 @@ impl EcVolume { } else { Version::current() }; - (vif_info.expire_at_sec, ver) + (vif_info.expire_at_sec, ver, vif_info.dat_file_size) } else { - (0, Version::current()) + (0, Version::current(), 0) } } else { - (0, Version::current()) + (0, Version::current(), 0) } }; @@ -150,7 +155,7 @@ impl EcVolume { dir_idx: dir_idx.to_string(), version: vif_version, shards, - dat_file_size: 0, + dat_file_size: vif_dat_file_size, data_shards, parity_shards, ecx_file: None, diff --git a/seaweed-volume/src/storage/store.rs b/seaweed-volume/src/storage/store.rs index cc1f93e3d..baf5a42bd 100644 --- a/seaweed-volume/src/storage/store.rs +++ b/seaweed-volume/src/storage/store.rs @@ -85,6 +85,17 @@ impl Store { self.locations.push(loc); + // First scrub partial EC artefacts left on one disk by an + // interrupted encode while the source .dat still lives on a + // sibling disk of the same store. The per-disk loader cannot + // see the sibling .dat and so loads the partial shards as if + // they were a distributed-EC layout, which makes the volume + // server heartbeat both a regular replica and an EC shard set + // for the same vid (seaweedfs/seaweedfs#9478). Running before + // the cross-disk reconcile keeps that pass from later + // re-loading shards we just cleaned up. + self.prune_incomplete_ec_with_sibling_dat(); + // After every disk has finished its per-disk EC scan, sweep // the store for shards that live on a disk without local index // files and load them by reaching across to a sibling disk's @@ -106,6 +117,7 @@ impl Store { tracing::error!("load_new_volumes error in {}: {}", loc.directory, e); } } + self.prune_incomplete_ec_with_sibling_dat(); self.reconcile_ec_shards_across_disks(); } diff --git a/seaweed-volume/src/storage/store_ec_reconcile.rs b/seaweed-volume/src/storage/store_ec_reconcile.rs index 8c75bd63d..20de814d3 100644 --- a/seaweed-volume/src/storage/store_ec_reconcile.rs +++ b/seaweed-volume/src/storage/store_ec_reconcile.rs @@ -21,9 +21,22 @@ use std::fs; use tracing::{info, warn}; use crate::storage::disk_location::{is_ec_shard_extension, parse_collection_volume_id_pub}; +use crate::storage::erasure_coding::ec_shard::DATA_SHARDS_COUNT; use crate::storage::store::Store; +use crate::storage::super_block::SUPER_BLOCK_SIZE; use crate::storage::types::VolumeId; +/// Sibling-disk `.dat` candidate for `prune_incomplete_ec_with_sibling_dat`. +/// We record both the disk index and the file's size: the size is +/// consulted before deleting any EC artefacts. A zero-byte or truncated +/// `.dat` is not a credible fallback, and we'd rather leave the partial +/// EC in place than wipe it based on garbage. +#[derive(Clone, Copy, Debug)] +struct DatOwnerInfo { + location: usize, + size: u64, +} + /// Key for orphan-shard reconciliation: collection + volume id. Two /// collections can re-use the same volume id, and we must only pair /// shards with their own `.ecx`. @@ -136,6 +149,187 @@ impl Store { } } + /// Remove leftover EC artefacts on one disk when a healthy `.dat` + /// for the same `(collection, vid)` lives on a sibling disk of the + /// same store. Mirrors `Store.pruneIncompleteEcWithSiblingDat` in + /// `weed/storage/store_ec_reconcile.go` (seaweedfs/seaweedfs#9478). + /// + /// `DiskLocation::handle_found_ecx_file` only looks for `.dat` in + /// the same disk as the EC shards. In a multi-disk volume server an + /// interrupted EC encode can leave `.ec??` + `.ecx` on disk B while + /// the source `.dat` still lives on disk A. The per-disk loader + /// sees no local `.dat`, classifies the layout as distributed EC, + /// and mounts the partial shards. The volume server then heartbeats + /// both a regular replica and an EC shard for the same vid, and + /// master ends up with a contradictory view. + /// + /// Cleanup is gated on `shard_count < DATA_SHARDS_COUNT` so that a + /// deliberate "full local EC, `.dat` retained" layout split across + /// two disks (`.dat` on disk A, all 10+ shards on disk B) is left + /// alone — the per-disk loader already keeps that configuration on + /// a single disk, and pruning it here would be a behaviour + /// regression. Distributed EC volumes (no `.dat` on any disk of + /// this server) also fall through unchanged because the `.dat` + /// index never matches. + /// + /// Before deleting any EC files we also check that the sibling + /// `.dat` is plausibly the encoding source: at least + /// `SUPER_BLOCK_SIZE` bytes long, and — when the EC's `.vif` + /// recorded a non-zero source size in `dat_file_size` — at least + /// that many bytes. A zero-byte shell or a truncated `.dat` does + /// not justify wiping the partial EC, because that EC shard may + /// still combine usefully with shards on other servers in a + /// recoverable distributed-EC layout. + /// + /// We don't have to push anything to a deleted-shards channel + /// here: the Rust heartbeat path in `server/heartbeat.rs` diffs + /// the current `ec_volumes` snapshot against the previous one each + /// tick, so the first heartbeat after the prune naturally emits a + /// delete delta for whatever the per-disk pass had already + /// reported. + pub fn prune_incomplete_ec_with_sibling_dat(&mut self) { + if self.locations.len() < 2 { + return; + } + + let dat_owners = self.index_dat_owners(); + if dat_owners.is_empty() { + return; + } + + // Snapshot under an immutable borrow so we can release it + // before taking the mutable borrow needed for cleanup. + struct Victim { + loc_idx: usize, + collection: String, + vid: VolumeId, + dat_dir: String, + shard_count: usize, + } + let mut victims: Vec = Vec::new(); + for (loc_idx, loc) in self.locations.iter().enumerate() { + for (vid, ev) in loc.ec_volumes() { + let shard_count = ev.shard_count(); + if shard_count >= DATA_SHARDS_COUNT { + continue; + } + let key = EcKey { + collection: ev.collection.clone(), + vid: *vid, + }; + let Some(owner) = dat_owners.get(&key) else { + continue; + }; + if owner.location == loc_idx { + // Same-disk .dat is the job of + // DiskLocation::validate_ec_volume during the + // per-disk pass; don't second-guess it here. + continue; + } + // Decide whether the sibling .dat is credible. Prefer + // the size baked into .vif at encode time; fall back + // to "at least a superblock" for old EC volumes whose + // .vif predates the field. + let required = if ev.dat_file_size > 0 { + ev.dat_file_size as u64 + } else { + SUPER_BLOCK_SIZE as u64 + }; + if owner.size < required { + warn!( + volume_id = vid.0, + collection = %ev.collection, + directory = %loc.directory, + shard_count, + sibling_dir = %self.locations[owner.location].directory, + sibling_dat_size = owner.size, + required, + "sibling .dat is smaller than the EC source size; leaving partial EC in place so distributed reconstruction is still possible (issue 9478)", + ); + continue; + } + victims.push(Victim { + loc_idx, + collection: ev.collection.clone(), + vid: *vid, + dat_dir: self.locations[owner.location].directory.clone(), + shard_count, + }); + } + } + + for v in victims { + warn!( + volume_id = v.vid.0, + collection = %v.collection, + directory = %self.locations[v.loc_idx].directory, + shard_count = v.shard_count, + required = DATA_SHARDS_COUNT, + dat_dir = %v.dat_dir, + "partial EC shards on this disk while a healthy .dat exists on a sibling disk; cleaning up leftover EC files (issue 9478)", + ); + let loc = &mut self.locations[v.loc_idx]; + if let Some(mut ec_vol) = loc.remove_ec_volume(v.vid) { + for _ in 0..ec_vol.shard_count() { + crate::metrics::VOLUME_GAUGE + .with_label_values(&[&ec_vol.collection, "ec_shards"]) + .dec(); + } + // destroy() closes shard file handles before unlinking + // (matters on Windows) and removes .ecx / .ecj / .vif. + ec_vol.destroy(); + } + // Also sweep any unmounted shard files (.ec00 .. .ec31) + // that the per-disk loader skipped — destroy() only walks + // the in-memory shards, but the disk may still hold others. + loc.remove_ec_volume_files(&v.collection, v.vid); + } + } + + /// Returns, for every `(collection, vid)`, the index of the first + /// disk on this store that holds a `.dat` for it plus the file's + /// size. Used by `prune_incomplete_ec_with_sibling_dat` so it can + /// decide whether partial EC artefacts on another disk are + /// leftovers of an interrupted encode AND whether the sibling + /// `.dat` is large enough to be a credible fallback. + /// + /// Any `.dat` `read_dir` can see is recorded — including zero-byte + /// shells. Their mere presence means this volume was a regular + /// volume on this server at some point, which rules out the + /// "distributed EC, no .dat anywhere" reading. Whether the file is + /// actually usable is the caller's call, made by comparing this + /// size to the EC's recorded source size in `.vif`. + fn index_dat_owners(&self) -> HashMap { + let mut owners: HashMap = HashMap::new(); + for (loc_idx, loc) in self.locations.iter().enumerate() { + let Ok(read) = fs::read_dir(&loc.directory) else { + continue; + }; + for ent in read.flatten() { + if ent.file_type().map(|ft| ft.is_dir()).unwrap_or(false) { + continue; + } + let name = ent.file_name().to_string_lossy().into_owned(); + let Some(base) = name.strip_suffix(".dat") else { + continue; + }; + let Some((collection, vid)) = parse_collection_volume_id_pub(base) else { + continue; + }; + let Ok(meta) = ent.metadata() else { + continue; + }; + owners + .entry(EcKey { collection, vid }) + .or_insert(DatOwnerInfo { + location: loc_idx, + size: meta.len(), + }); + } + } + owners + } + /// Build a `(collection, vid) -> EcxOwnerInfo` map of which disk /// owns the `.ecx` file. `.ecx` normally lives in `IdxDirectory` /// but may have been written into the data directory before @@ -725,6 +919,249 @@ mod tests { assert!(ev.has_shard(0)); assert!(ev.has_shard(1)); } + + /// Reproduces the issue 9478 layout for a single volume server: + /// disk A holds a healthy `.dat` for vid 122; disk B holds a single + /// `.ec01` plus `.ecx` / `.ecj` / `.vif` and no `.dat`. The per-disk + /// loader mounts the partial shard on disk B as if it were a + /// distributed-EC layout. The Store-level prune must unmount and + /// delete the leftover EC files on disk B while leaving disk A's + /// `.dat` untouched. + #[test] + fn test_prune_drops_partial_ec_when_sibling_disk_has_dat() { + let tmp = TempDir::new().unwrap(); + let dat_dir = tmp.path().join("sdd"); + let ec_dir = tmp.path().join("sdf"); + std::fs::create_dir_all(&dat_dir).unwrap(); + std::fs::create_dir_all(&ec_dir).unwrap(); + + let collection = ""; + let vid = 122u32; + + // Disk A (sdd): a non-trivial .dat plus the regular-volume + // sidecars. The .dat content doesn't matter for the prune — + // load_existing_volumes refuses to mount it because the + // superblock is absent — but its file name has to be present + // so index_dat_owners records this disk as the .dat owner. + let dat_path = dat_dir.join(format!("{}_{}.dat", collection, vid).trim_start_matches('_')); + std::fs::write(&dat_path, vec![0u8; 1024]).unwrap(); + + // Disk B (sdf): partial EC — one shard, plus .ecx / .ecj / .vif. + write_shard(ec_dir.to_str().unwrap(), collection, vid, 1); + write_index_files(ec_dir.to_str().unwrap(), collection, vid, 10, 4); + + let mut store = Store::new(NeedleMapKind::InMemory); + store + .add_location( + dat_dir.to_str().unwrap(), + dat_dir.to_str().unwrap(), + 100, + DiskType::HardDrive, + MinFreeSpace::Percent(0.0), + Vec::new(), + ) + .unwrap(); + store + .add_location( + ec_dir.to_str().unwrap(), + ec_dir.to_str().unwrap(), + 100, + DiskType::HardDrive, + MinFreeSpace::Percent(0.0), + Vec::new(), + ) + .unwrap(); + + // After add_location for sdf, the per-disk loader has mounted + // the partial shard AND the prune has run as part of + // add_location's epilogue. Confirm the prune removed it. + let ec_loc = &store.locations[1]; + assert!( + ec_loc.find_ec_volume(VolumeId(vid)).is_none(), + "partial EC volume should have been unmounted by the prune", + ); + + let ec_base = ec_dir + .join(format!("{}_{}", collection, vid).trim_start_matches('_')) + .to_string_lossy() + .into_owned(); + assert!( + !std::path::Path::new(&format!("{}.ec01", ec_base)).exists(), + "partial shard file should have been removed", + ); + assert!( + !std::path::Path::new(&format!("{}.ecx", ec_base)).exists(), + ".ecx should have been removed", + ); + assert!( + !std::path::Path::new(&format!("{}.ecj", ec_base)).exists(), + ".ecj should have been removed", + ); + + // The .dat on the sibling disk must survive — pruning the + // partial EC must never touch the healthy replica. + assert!( + dat_path.exists(), + "healthy .dat on sibling disk should be left alone", + ); + } + + /// Safety guard for `prune_incomplete_ec_with_sibling_dat`: when + /// the sibling `.dat` is smaller than the source size recorded in + /// the EC's `.vif`, the `.dat` is clearly truncated and is not a + /// credible fallback. The partial shard may still combine with + /// shards on other servers in a recoverable distributed-EC layout, + /// so we leave the EC files in place. + #[test] + fn test_prune_keeps_partial_ec_when_sibling_dat_is_smaller_than_vif_source_size() { + let tmp = TempDir::new().unwrap(); + let dat_dir = tmp.path().join("sdd"); + let ec_dir = tmp.path().join("sdf"); + std::fs::create_dir_all(&dat_dir).unwrap(); + std::fs::create_dir_all(&ec_dir).unwrap(); + + let collection = "logs"; + let vid = 122u32; + + // Tiny but loadable .dat on sdd: a valid 8-byte version-3 + // superblock so Volume::new succeeds, padded out to a few + // hundred bytes so we have something well below the .vif- + // recorded source size below. Anything smaller would let + // Volume::new's auto-write extend the file to SUPER_BLOCK_SIZE + // and obscure the truncation we're trying to model. + let dat_path = dat_dir.join(format!("{}_{}.dat", collection, vid)); + let mut dat_bytes = crate::storage::super_block::SuperBlock::default().to_bytes(); + dat_bytes.resize(256, 0u8); + std::fs::write(&dat_path, &dat_bytes).unwrap(); + let sibling_dat_size = dat_bytes.len() as u64; + let source_size = 10 * 1024 * 1024u64; // 10 MiB recorded in .vif + assert!(sibling_dat_size < source_size); + + // Partial EC: one shard plus .ecx / .ecj / .vif. The .vif + // records the source .dat size at encode time; the safety + // check compares the sibling .dat against this value. + write_shard(ec_dir.to_str().unwrap(), collection, vid, 1); + let vif = VifVolumeInfo { + version: 3, + dat_file_size: source_size as i64, + ec_shard_config: Some(VifEcShardConfig { + data_shards: 10, + parity_shards: 4, + }), + ..Default::default() + }; + std::fs::write( + ec_dir.join(format!("{}_{}.vif", collection, vid)), + serde_json::to_string(&vif).unwrap(), + ) + .unwrap(); + std::fs::write( + ec_dir.join(format!("{}_{}.ecx", collection, vid)), + vec![0u8; 20], + ) + .unwrap(); + std::fs::write( + ec_dir.join(format!("{}_{}.ecj", collection, vid)), + b"", + ) + .unwrap(); + + let mut store = Store::new(NeedleMapKind::InMemory); + store + .add_location( + dat_dir.to_str().unwrap(), + dat_dir.to_str().unwrap(), + 100, + DiskType::HardDrive, + MinFreeSpace::Percent(0.0), + Vec::new(), + ) + .unwrap(); + store + .add_location( + ec_dir.to_str().unwrap(), + ec_dir.to_str().unwrap(), + 100, + DiskType::HardDrive, + MinFreeSpace::Percent(0.0), + Vec::new(), + ) + .unwrap(); + + // Prune ran as part of add_location's epilogue. Confirm the + // partial EC was left alone because the sibling .dat is too + // small to be the encoding source. + let ec_loc = &store.locations[1]; + let ev = ec_loc + .find_ec_volume(VolumeId(vid)) + .expect("partial EC volume must remain mounted when the sibling .dat is smaller than the .vif source size"); + assert_eq!(ev.shard_count(), 1); + + let ec_base = ec_dir + .join(format!("{}_{}", collection, vid)) + .to_string_lossy() + .into_owned(); + assert!( + std::path::Path::new(&format!("{}.ec01", ec_base)).exists(), + "partial shard file must survive when the sibling .dat is truncated", + ); + assert!( + std::path::Path::new(&format!("{}.ecx", ec_base)).exists(), + ".ecx must survive when the sibling .dat is truncated", + ); + } + + /// A distributed EC volume — no `.dat` on any disk of this store — + /// must not be touched by the prune even when shard count is below + /// `DATA_SHARDS_COUNT`. Partial shard layouts are normal for + /// distributed EC. + #[test] + fn test_prune_leaves_distributed_ec_alone() { + let tmp = TempDir::new().unwrap(); + let dir0 = tmp.path().join("data0"); + let dir1 = tmp.path().join("data1"); + std::fs::create_dir_all(&dir0).unwrap(); + std::fs::create_dir_all(&dir1).unwrap(); + + let collection = "logs"; + let vid = 7u32; + + // Two shards on dir0 with .ecx, no .dat anywhere. + write_shard(dir0.to_str().unwrap(), collection, vid, 1); + write_shard(dir0.to_str().unwrap(), collection, vid, 2); + write_index_files(dir0.to_str().unwrap(), collection, vid, 10, 4); + + let mut store = Store::new(NeedleMapKind::InMemory); + for dir in [&dir0, &dir1] { + store + .add_location( + dir.to_str().unwrap(), + dir.to_str().unwrap(), + 100, + DiskType::HardDrive, + MinFreeSpace::Percent(0.0), + Vec::new(), + ) + .unwrap(); + } + + let ev = store.locations[0] + .find_ec_volume(VolumeId(vid)) + .expect("distributed EC volume should still be mounted on dir0"); + assert_eq!( + ev.shard_count(), + 2, + "no .dat anywhere on this store means the partial shards must be kept as distributed EC", + ); + + let ec_base = dir0 + .join(format!("{}_{}", collection, vid)) + .to_string_lossy() + .into_owned(); + assert!(std::path::Path::new(&format!("{}.ec01", ec_base)).exists()); + assert!(std::path::Path::new(&format!("{}.ec02", ec_base)).exists()); + assert!(std::path::Path::new(&format!("{}.ecx", ec_base)).exists()); + } } /// Walk a disk's data directory and return the `.ec??` shard files diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index b5a966f2d..fa24d2118 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -288,6 +288,15 @@ func (ev *EcVolume) ShardSize() uint64 { return 0 } +// DatFileSize returns the source .dat file size as recorded in .vif at +// EC encoding time. Zero for old EC volumes whose .vif predates the +// field, or for .vif files we failed to parse. Used by the Store-level +// prune in store_ec_reconcile.go to validate that a sibling-disk .dat +// is plausibly the encoding source before deleting the partial EC. +func (ev *EcVolume) DatFileSize() int64 { + return ev.datFileSize +} + func (ev *EcVolume) Size() (size uint64) { for _, shard := range ev.Shards { if shardSize := shard.Size(); shardSize > 0 { diff --git a/weed/storage/store.go b/weed/storage/store.go index 9d0c19535..6c31633c0 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -154,6 +154,16 @@ func NewStore( } wg.Wait() + // First, scrub partial EC artefacts left on one disk by an interrupted + // encode while the source .dat still lives on a sibling disk of the + // same store. The per-disk loader cannot see the sibling .dat and so + // loads the partial shards as if they were a distributed-EC layout, + // which makes the volume server heartbeat both a regular replica and + // an EC shard set for the same vid (issue #9478). Running before the + // cross-disk reconcile keeps that pass from later re-loading shards + // we just cleaned up. + s.pruneIncompleteEcWithSiblingDat() + // After every DiskLocation has finished its per-disk EC scan, sweep the // store for shards that live on a disk without local index files and // load them by reaching across to a sibling disk's .ecx / .ecj / .vif. diff --git a/weed/storage/store_ec_hybrid_repro_test.go b/weed/storage/store_ec_hybrid_repro_test.go new file mode 100644 index 000000000..db245e8c7 --- /dev/null +++ b/weed/storage/store_ec_hybrid_repro_test.go @@ -0,0 +1,257 @@ +package storage + +import ( + "os" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// TestIssue9478_PartialEcOnSiblingDiskOfHealthyDat reproduces +// https://github.com/seaweedfs/seaweedfs/issues/9478: +// +// - A single volume server has two disks. Disk A holds a healthy +// volume_x.dat (with .idx + .vif). Disk B holds the leftovers of an +// interrupted EC encode for the SAME volume: a single .ec?? shard plus +// .ecx + .ecj + .vif, with no .dat next to them. +// +// The current per-disk EC loader (handleFoundEcxFile / validateEcVolume) only +// checks for .dat in the same DiskLocation as the EC shards. Because Disk B +// has no .dat, it concludes "this must be a distributed EC volume" and +// happily loads the lone partial shard. The .dat on Disk A also loads as a +// regular replica. The volume server then heartbeats BOTH a regular volume +// and an EC shard for the same vid, which is the inconsistent state the +// issue is describing. +// +// The expected behaviour is to recognise that a healthy .dat exists on a +// sibling disk on the same store and clean up the partial EC files on +// Disk B, exactly as we already do today when .dat and the partial EC files +// sit on the same disk. +func TestIssue9478_PartialEcOnSiblingDiskOfHealthyDat(t *testing.T) { + collection := "" + vid := needle.VolumeId(122) + + root := t.TempDir() + datDir := root + "/sdd" + ecDir := root + "/sdf" + if err := os.MkdirAll(datDir, 0o755); err != nil { + t.Fatalf("mkdir datDir: %v", err) + } + if err := os.MkdirAll(ecDir, 0o755); err != nil { + t.Fatalf("mkdir ecDir: %v", err) + } + + // Disk A (sdd): a healthy-looking volume_x.dat + .idx + .vif. We don't + // run NewVolume here, only check what loadAllEcShards on Disk B does in + // the presence of a same-server .dat. A truncated 10 MiB .dat is enough + // for the EC validator to compute an expected shard size from. + datFileSize := int64(10 * 1024 * 1024) + datBase := erasure_coding.EcShardFileName(collection, datDir, int(vid)) + if f, err := os.Create(datBase + ".dat"); err == nil { + if err := f.Truncate(datFileSize); err != nil { + t.Fatalf("truncate dat: %v", err) + } + f.Close() + } else { + t.Fatalf("create dat: %v", err) + } + if f, err := os.Create(datBase + ".idx"); err == nil { + f.Close() + } + if f, err := os.Create(datBase + ".vif"); err == nil { + f.Close() + } + + // Disk B (sdf): only one EC shard plus .ecx + .ecj + .vif, no .dat. + // This mirrors the issue 9478 listing for volume_server_4 / sdf. + ecBase := erasure_coding.EcShardFileName(collection, ecDir, int(vid)) + expectedShardSize := calculateExpectedShardSize(datFileSize, erasure_coding.DataShardsCount) + if f, err := os.Create(ecBase + erasure_coding.ToExt(1)); err == nil { + if err := f.Truncate(expectedShardSize); err != nil { + t.Fatalf("truncate shard: %v", err) + } + f.Close() + } else { + t.Fatalf("create shard: %v", err) + } + if f, err := os.Create(ecBase + ".ecx"); err == nil { + f.WriteString("dummy ecx") + f.Close() + } + if f, err := os.Create(ecBase + ".ecj"); err == nil { + f.Close() + } + if f, err := os.Create(ecBase + ".vif"); err == nil { + f.Close() + } + + minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"} + makeDisk := func(dir string) *DiskLocation { + dl := &DiskLocation{ + Directory: dir, + DirectoryUuid: "test-uuid-" + dir, + IdxDirectory: dir, + DiskType: types.HddType, + MinFreeSpace: minFreeSpace, + } + dl.volumes = make(map[needle.VolumeId]*Volume) + dl.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume) + return dl + } + datLoc := makeDisk(datDir) + ecLoc := makeDisk(ecDir) + + // Stand the disks up the same way NewStore does: per-disk EC scan + // first, then the Store-level passes. The per-disk pass alone cannot + // see the .dat on the sibling disk, so it mounts the partial shards; + // the Store-level prune is what brings the cluster back to a + // consistent state. + store := &Store{ + Locations: []*DiskLocation{datLoc, ecLoc}, + NewEcShardsChan: make(chan master_pb.VolumeEcShardInformationMessage, 16), + DeletedEcShardsChan: make(chan master_pb.VolumeEcShardInformationMessage, 16), + } + ecLoc.ecShardNotifyHandler = func(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume) { + store.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{ + Id: uint32(vid), + Collection: collection, + } + } + + if err := ecLoc.loadAllEcShards(ecLoc.ecShardNotifyHandler); err != nil { + t.Logf("loadAllEcShards on ecDir returned: %v", err) + } + t.Cleanup(func() { closeEcVolumes(ecLoc) }) + + preShardCount := ecLoc.EcShardCount() + preNewMessages := len(store.NewEcShardsChan) + t.Logf("after per-disk EC load on sdf: inMemoryShards=%d newEcShardMessages=%d", preShardCount, preNewMessages) + if preShardCount == 0 { + t.Fatalf("test setup is no longer reproducing the bug: per-disk EC load did not mount the partial shard") + } + + store.pruneIncompleteEcWithSiblingDat() + store.reconcileEcShardsAcrossDisks() + + leftoverShard := util.FileExists(ecBase + erasure_coding.ToExt(1)) + leftoverEcx := util.FileExists(ecBase + ".ecx") + leftoverEcj := util.FileExists(ecBase + ".ecj") + loaded := ecLoc.EcShardCount() + deleteMessages := len(store.DeletedEcShardsChan) + + t.Logf("after Store-level cleanup: shardFileLeft=%v ecxLeft=%v ecjLeft=%v inMemoryShards=%d deletedEcShardMessages=%d", + leftoverShard, leftoverEcx, leftoverEcj, loaded, deleteMessages) + + if loaded != 0 { + t.Fatalf("partial EC shard was still mounted (%d shards) after the Store-level prune; a healthy .dat exists on a sibling disk and the leftover should have been cleaned up", loaded) + } + if leftoverShard || leftoverEcx || leftoverEcj { + t.Fatalf("partial EC files survived the Store-level prune (shard=%v ecx=%v ecj=%v); a healthy .dat exists on a sibling disk and the leftover should have been removed", + leftoverShard, leftoverEcx, leftoverEcj) + } + if deleteMessages == 0 { + t.Errorf("prune did not push DeletedEcShardsChan; master would have to wait for the next periodic heartbeat to forget the partial shard") + } + + // The .dat on the sibling disk must survive — pruning the partial EC + // must never touch the healthy replica we are falling back to. + if !util.FileExists(datBase + ".dat") { + t.Fatalf("healthy .dat on sibling disk was removed by the prune; only the partial EC files should be cleaned up") + } +} + +// TestIssue9478_ZeroByteSiblingDatKeepsPartialEc is the safety guard for +// pruneIncompleteEcWithSiblingDat: when the sibling .dat is zero bytes +// (or smaller than what .vif recorded as the encoding source), we'd +// rather keep the partial EC than delete it based on garbage. The +// partial shard may still combine with shards on other servers in a +// recoverable distributed-EC layout. +func TestIssue9478_ZeroByteSiblingDatKeepsPartialEc(t *testing.T) { + collection := "" + vid := needle.VolumeId(122) + + root := t.TempDir() + datDir := root + "/sdd" + ecDir := root + "/sdf" + if err := os.MkdirAll(datDir, 0o755); err != nil { + t.Fatalf("mkdir datDir: %v", err) + } + if err := os.MkdirAll(ecDir, 0o755); err != nil { + t.Fatalf("mkdir ecDir: %v", err) + } + + // Sibling .dat is a zero-byte shell — the kind volume servers 3 + // and 5 in the issue 9478 report have. + datBase := erasure_coding.EcShardFileName(collection, datDir, int(vid)) + if f, err := os.Create(datBase + ".dat"); err == nil { + f.Close() + } else { + t.Fatalf("create zero-byte dat: %v", err) + } + + ecBase := erasure_coding.EcShardFileName(collection, ecDir, int(vid)) + datFileSizeForShard := int64(10 * 1024 * 1024) + expectedShardSize := calculateExpectedShardSize(datFileSizeForShard, erasure_coding.DataShardsCount) + if f, err := os.Create(ecBase + erasure_coding.ToExt(1)); err == nil { + if err := f.Truncate(expectedShardSize); err != nil { + t.Fatalf("truncate shard: %v", err) + } + f.Close() + } + if f, err := os.Create(ecBase + ".ecx"); err == nil { + f.WriteString("dummy ecx") + f.Close() + } + if f, err := os.Create(ecBase + ".ecj"); err == nil { + f.Close() + } + if f, err := os.Create(ecBase + ".vif"); err == nil { + f.Close() + } + + minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"} + makeDisk := func(dir string) *DiskLocation { + dl := &DiskLocation{ + Directory: dir, + DirectoryUuid: "test-uuid-" + dir, + IdxDirectory: dir, + DiskType: types.HddType, + MinFreeSpace: minFreeSpace, + } + dl.volumes = make(map[needle.VolumeId]*Volume) + dl.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume) + return dl + } + datLoc := makeDisk(datDir) + ecLoc := makeDisk(ecDir) + + store := &Store{ + Locations: []*DiskLocation{datLoc, ecLoc}, + NewEcShardsChan: make(chan master_pb.VolumeEcShardInformationMessage, 16), + DeletedEcShardsChan: make(chan master_pb.VolumeEcShardInformationMessage, 16), + } + + if err := ecLoc.loadAllEcShards(nil); err != nil { + t.Logf("loadAllEcShards on ecDir returned: %v", err) + } + t.Cleanup(func() { closeEcVolumes(ecLoc) }) + if ecLoc.EcShardCount() == 0 { + t.Fatalf("test setup is not reproducing the layout: per-disk EC load did not mount the partial shard") + } + + store.pruneIncompleteEcWithSiblingDat() + + if ecLoc.EcShardCount() == 0 { + t.Fatalf("prune deleted partial EC based on a zero-byte sibling .dat; the shard should have been left in place so distributed reconstruction is still possible") + } + if !util.FileExists(ecBase + erasure_coding.ToExt(1)) { + t.Fatalf("prune deleted the partial shard file based on a zero-byte sibling .dat") + } + if len(store.DeletedEcShardsChan) != 0 { + t.Errorf("prune pushed a DeletedEcShardsChan message despite skipping cleanup; nothing was actually deleted") + } +} diff --git a/weed/storage/store_ec_reconcile.go b/weed/storage/store_ec_reconcile.go index afb7ca65d..7b0923331 100644 --- a/weed/storage/store_ec_reconcile.go +++ b/weed/storage/store_ec_reconcile.go @@ -7,10 +7,22 @@ import ( "strings" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" ) +// datOwnerInfo records both the disk that holds a .dat for a given +// (collection, vid) and the size on disk. The size is consulted by +// pruneIncompleteEcWithSiblingDat before deleting any EC artefacts: +// a zero-byte or truncated .dat is not a credible fallback, and we'd +// rather leave the partial EC in place than wipe it based on garbage. +type datOwnerInfo struct { + location *DiskLocation + size int64 +} + // ecKeyForReconcile keys orphan-shard reconciliation by collection + volume // id. Per-collection grouping matters because two collections can re-use the // same volume id, and we must only pair shards with their own .ecx file. @@ -125,6 +137,160 @@ func (s *Store) indexEcxOwners() map[ecKeyForReconcile]ecxOwnerInfo { return owners } +// pruneIncompleteEcWithSiblingDat removes leftover EC artefacts on one +// disk when a healthy .dat for the same (collection, vid) lives on a +// sibling disk of the same store. This is the cross-disk analogue of the +// validateEcVolume cleanup in handleFoundEcxFile: a same-disk .dat next +// to partial shards is already taken as proof that an EC encode was +// interrupted, and the partial shards get removed so the .dat keeps +// serving the volume. Per-disk loaders cannot see sibling disks, so when +// the .dat ends up on disk A and the partial shards on disk B the per-disk +// pass mistakes the leftover for a normal distributed-EC layout (no .dat +// next to .ecx) and mounts the partial shards. The volume server then +// heartbeats both a regular replica and an EC shard for the same vid, the +// master keeps both entries, and reads route through either path +// depending on the client. Issue 9478. +// +// Cleanup is gated on shardCount < DataShardsCount so that a deliberate +// "full local EC, .dat retained" layout split across two disks (.dat on +// disk A, all 10+ shards on disk B) is left alone — the per-disk loader +// already keeps that configuration when everything is on a single disk, +// and pruning it here would be a behaviour regression for operators who +// rely on it. Distributed EC volumes (no .dat on any disk of this server) +// also fall through unchanged because the lookup in the .dat index below +// will simply not find a match. +// +// Before deleting any EC files we also check that the sibling .dat is +// plausibly the encoding source: at least super_block.SuperBlockSize +// bytes long, and — when the EC's .vif recorded a non-zero source size +// in datFileSize — at least that many bytes. A zero-byte shell or a +// truncated .dat does not justify wiping the partial EC, because that +// EC shard may still combine usefully with shards on other servers in +// a recoverable distributed-EC layout. +// +// We push DeletedEcShardsChan for every pruned shard so the master is told +// to forget the registrations the per-disk pass already emitted on +// NewEcShardsChan during startup, instead of waiting for the first +// periodic heartbeat to reconcile. +func (s *Store) pruneIncompleteEcWithSiblingDat() { + if len(s.Locations) < 2 { + return + } + + datOwners := s.indexDatOwners() + if len(datOwners) == 0 { + return + } + + for diskId, loc := range s.Locations { + // Snapshot under the read lock so we are not iterating + // ecVolumes while the cleanup below takes the write lock. + type victim struct { + collection string + vid needle.VolumeId + messages []*master_pb.VolumeEcShardInformationMessage + datDir string + shardCount int + } + var victims []victim + loc.ecVolumesLock.RLock() + for vid, ev := range loc.ecVolumes { + shardCount := len(ev.Shards) + if shardCount >= erasure_coding.DataShardsCount { + continue + } + key := ecKeyForReconcile{collection: ev.Collection, vid: vid} + owner, hasDat := datOwners[key] + if !hasDat || owner.location == loc { + continue + } + // Decide whether the sibling .dat is a credible source. + // Prefer the size baked into .vif at encode time; fall + // back to "at least a superblock" for old EC volumes + // whose .vif predates the field. + requiredDatSize := ev.DatFileSize() + if requiredDatSize <= 0 { + requiredDatSize = int64(super_block.SuperBlockSize) + } + if owner.size < requiredDatSize { + glog.Warningf("ec volume %d (collection=%q) on %s has only %d shards but sibling .dat on %s is %d bytes (need >= %d); leaving partial EC in place so distributed reconstruction is still possible", + vid, ev.Collection, loc.Directory, shardCount, owner.location.Directory, owner.size, requiredDatSize) + continue + } + victims = append(victims, victim{ + collection: ev.Collection, + vid: vid, + messages: ev.ToVolumeEcShardInformationMessage(uint32(diskId)), + datDir: owner.location.Directory, + shardCount: shardCount, + }) + } + loc.ecVolumesLock.RUnlock() + + for _, v := range victims { + glog.Warningf("ec volume %d (collection=%q) on %s has only %d shards (need %d) while a healthy .dat exists on sibling disk %s; cleaning up leftover EC files (issue 9478)", + v.vid, v.collection, loc.Directory, v.shardCount, erasure_coding.DataShardsCount, v.datDir) + loc.unloadEcVolume(v.vid) + loc.removeEcVolumeFiles(v.collection, v.vid) + for _, msg := range v.messages { + select { + case s.DeletedEcShardsChan <- *msg: + default: + // Channel full during startup is fine — the next + // periodic heartbeat reports the full ecVolumes + // state, which no longer contains these shards. + glog.V(2).Infof("DeletedEcShardsChan full while pruning ec volume %d; relying on periodic heartbeat", v.vid) + } + } + } + } +} + +// indexDatOwners returns, for every (collection, vid), the first disk on +// this store that holds a .dat file for it plus the file's size. Used by +// pruneIncompleteEcWithSiblingDat so it can decide whether partial EC +// artefacts on another disk are leftovers of an interrupted encode AND +// whether the sibling .dat is large enough to be a credible fallback. +// +// We record any .dat os.ReadDir can see — including zero-byte shells. +// The mere presence of a .dat means this volume was a regular volume on +// this server at some point, which rules out the "distributed EC, no +// .dat anywhere" reading. Whether that .dat is actually usable is the +// caller's call, made by comparing this size to the EC's recorded +// source size in .vif. +func (s *Store) indexDatOwners() map[ecKeyForReconcile]datOwnerInfo { + owners := make(map[ecKeyForReconcile]datOwnerInfo) + for _, loc := range s.Locations { + entries, err := os.ReadDir(loc.Directory) + if err != nil { + continue + } + for _, entry := range entries { + if entry.IsDir() { + continue + } + name := entry.Name() + if !strings.HasSuffix(name, ".dat") { + continue + } + base := name[:len(name)-len(".dat")] + collection, vid, err := parseCollectionVolumeId(base) + if err != nil { + continue + } + info, err := entry.Info() + if err != nil { + continue + } + key := ecKeyForReconcile{collection: collection, vid: vid} + if _, exists := owners[key]; !exists { + owners[key] = datOwnerInfo{location: loc, size: info.Size()} + } + } + } + return owners +} + // collectOrphanEcShards walks the disk's data directory and returns the // .ec?? shard files that are present on disk but not yet registered to an // EcVolume in memory. The map is keyed by (collection, vid) so callers can