From 49e83a26cb4e1a2d8fd64862efc1097571e01255 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 27 Apr 2026 16:41:46 -0700 Subject: [PATCH] feat(seaweed-volume): auto-load EC shards on startup (#9212) (#9251) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(seaweed-volume): auto-load EC shards on startup The Rust volume server's load_existing_volumes only scanned .dat files; EC shards on disk stayed invisible until something explicitly issued VolumeEcShardsMount. Strict superset of the issue seaweedfs/seaweedfs#9212 reports for Go: after a fresh restart, every local EC shard was missing from the master's view. Port loadAllEcShards from weed/storage/disk_location_ec.go: - DiskLocation::load_all_ec_shards walks Directory (and IdxDirectory if separate) sorted, groups .ec?? shard files by (collection, vid), validates and mounts each group when its matching .ecx is found. - handle_found_ecx_file: validate_ec_volume + mount_ec_shards path, with cleanup when .dat exists and validation fails (incomplete encoding) or load fails. - check_orphaned_shards: cleans up shard remnants whose .ecx never arrived AND whose stale .dat is still present (interrupted encoding); leaves them on disk otherwise so cross-disk reconciliation / operator recovery can find them. - check_dat_file_exists / parse_collection_volume_id / parse_ec_shard_extension: small helpers mirroring Go's checkDatFileExists, parseCollectionVolumeId, and the `\.ec\d{2,3}` regex. - Wire through load_existing_volumes after the .dat scan; failures log but don't fail the disk's startup. Tests: - test_parse_ec_shard_extension covers .ec00–.ec255 and the rejection of .ec0, .ec999, .ecx, .ecj, .dat, and missing leading dot. - test_load_all_ec_shards_mounts_pairs_with_ecx: shards + .ecx + .vif on disk get mounted into ec_volumes after load_existing_volumes. - test_load_all_ec_shards_keeps_orphan_shards_when_no_dat: orphan shards (no .ecx, no .dat) stay on disk untouched (distributed-EC scenario). - test_load_all_ec_shards_cleans_orphan_shards_when_dat_exists: orphan shards alongside a stale .dat get cleaned up (interrupted-encoding scenario). Prerequisite for porting the cross-disk orphan-shard reconciliation in seaweedfs/seaweedfs#9244 to Rust. * fix(seaweed-volume): dedupe filenames when scanning data + idx dirs load_all_ec_shards scans both `directory` and `idx_directory` (when they differ) so the loop can pair `.ec??` shards with their `.ecx` regardless of which dir owns the index. If the same filename is present in both — possible in idempotent legacy layouts that pre-date `-dir.idx` — the previous implementation processed it twice. mount_ec_shards increments the per-shard `ec_shards` metric inside the loop, so a duplicated `.ec??` entry would double-count the gauge. Use a HashSet while accumulating entries so each filename is processed exactly once. Reported in PR #9251 review by @gemini-code-assist. * fix(seaweed-volume): drive partial-mount cleanup through unmount_ec_shards handle_found_ecx_file calls mount_ec_shards which adds shards one at a time. mount_ec_shards increments the `ec_shards` gauge per shard that successfully attaches. If mount fails halfway, plain ec_volumes.remove(vid) drops the EcVolume but leaves the gauge incremented for whatever did mount. Drive the cleanup branches through unmount_ec_shards instead — it mirror-decrements the gauge per shard and only then drops the EcVolume. Same shape applied to both .dat-exists and distributed-EC fallbacks. Reported in PR #9251 review by @gemini-code-assist. * docs(seaweed-volume): clarify parse_ec_shard_extension shard-id range Doc previously said `.ec00`–`.ec999` but the implementation rejects any shard id > 255 (matches the `EcVolumeShard` u8 typed shard id and Go's `strconv.ParseInt(... 10, 64)` + `> 255` guard). Fix the doc to say `.ec00`–`.ec255` and explain why the 3-digit form is still recognised. Reported in PR #9251 review by @coderabbitai. --- seaweed-volume/src/storage/disk_location.rs | 383 ++++++++++++++++++++ 1 file changed, 383 insertions(+) diff --git a/seaweed-volume/src/storage/disk_location.rs b/seaweed-volume/src/storage/disk_location.rs index 47f36a0fb..b2b359c38 100644 --- a/seaweed-volume/src/storage/disk_location.rs +++ b/seaweed-volume/src/storage/disk_location.rs @@ -202,6 +202,15 @@ impl DiskLocation { } } + // After regular volumes, auto-discover EC shards on disk so a + // fresh restart picks up shards without an explicit + // VolumeEcShardsMount RPC. Mirrors Go's loadExistingVolumes + // calling loadAllEcShards. Failures here only log — they + // shouldn't fail the whole disk's startup. + if let Err(e) = self.load_all_ec_shards() { + warn!(directory = %self.directory, error = %e, "load_all_ec_shards failed"); + } + Ok(()) } @@ -625,6 +634,183 @@ impl DiskLocation { } } + /// Auto-discover .ec?? shard files on disk and mount them. + /// + /// Mirrors `loadAllEcShards` in `weed/storage/disk_location_ec.go`. + /// Reads the data dir (and idx dir, if separate), groups shards by + /// (collection, vid) using sorted iteration so a volume's shards + /// land contiguously next to their `.ecx`. When the matching `.ecx` + /// is encountered, validates and mounts the group. + /// + /// Without this, an EC shard on disk is invisible after a Rust + /// volume-server restart until something explicitly issues + /// VolumeEcShardsMount — strict superset of seaweedfs/seaweedfs#9212. + pub fn load_all_ec_shards(&mut self) -> io::Result<()> { + // Use a HashSet during accumulation so a name appearing in + // both data dir and idx dir (idempotent legacy layouts can + // place .ecx in either) is processed exactly once. Without + // dedup, mount_ec_shards' per-shard metric increment would + // double-count for those filenames. + let mut seen: HashSet = HashSet::new(); + let mut entries: Vec = Vec::new(); + for ent in fs::read_dir(&self.directory)? { + let ent = ent?; + if ent.file_type().map(|ft| ft.is_dir()).unwrap_or(false) { + continue; + } + let name = ent.file_name().to_string_lossy().into_owned(); + if seen.insert(name.clone()) { + entries.push(name); + } + } + if self.idx_directory != self.directory { + for ent in fs::read_dir(&self.idx_directory)? { + let ent = ent?; + if ent.file_type().map(|ft| ft.is_dir()).unwrap_or(false) { + continue; + } + let name = ent.file_name().to_string_lossy().into_owned(); + if seen.insert(name.clone()) { + entries.push(name); + } + } + } + entries.sort(); + + let mut same_volume_shards: Vec<(String, u32)> = Vec::new(); // (filename, shard_id) + let mut prev_vid: Option = None; + let mut prev_collection: String = String::new(); + + for name in entries { + let Some(dot) = name.rfind('.') else { + continue; + }; + let (base, ext) = name.split_at(dot); // ext includes leading '.' + let Some((collection, vid)) = parse_collection_volume_id(base) else { + continue; + }; + + if let Some(shard_id) = parse_ec_shard_extension(ext) { + // Ignore zero-byte shards — same as Go. + let path = format!("{}/{}", self.directory, name); + let size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0); + if size == 0 { + continue; + } + + let same_group = match prev_vid { + None => true, + Some(p) => p == vid && prev_collection == collection, + }; + if same_group { + same_volume_shards.push((name, shard_id)); + } else { + self.check_orphaned_shards( + &same_volume_shards, + &prev_collection, + prev_vid.unwrap_or(VolumeId(0)), + ); + same_volume_shards = vec![(name, shard_id)]; + } + prev_vid = Some(vid); + prev_collection = collection; + continue; + } + + if ext == ".ecx" + && prev_vid == Some(vid) + && prev_collection == collection + && !same_volume_shards.is_empty() + { + let shards = std::mem::take(&mut same_volume_shards); + self.handle_found_ecx_file(&shards, &collection, vid); + prev_vid = None; + prev_collection.clear(); + } + } + + // Handle any trailing group that never saw a matching .ecx. + self.check_orphaned_shards( + &same_volume_shards, + &prev_collection, + prev_vid.unwrap_or(VolumeId(0)), + ); + + Ok(()) + } + + /// Validate + mount a (collection, vid) group when its `.ecx` is + /// found. Mirrors `handleFoundEcxFile` in + /// `weed/storage/disk_location_ec.go`. + fn handle_found_ecx_file(&mut self, shards: &[(String, u32)], collection: &str, vid: VolumeId) { + let base = volume_file_name(&self.directory, collection, vid); + let dat_path = format!("{}.dat", base); + let dat_exists = check_dat_file_exists(&dat_path); + + if dat_exists && !self.validate_ec_volume(collection, vid) { + warn!( + volume_id = vid.0, + "Incomplete or invalid EC volume: .dat exists but validation failed, cleaning up EC files", + ); + self.remove_ec_volume_files(collection, vid); + return; + } + + let shard_ids: Vec = shards.iter().map(|(_, sid)| *sid).collect(); + if let Err(e) = self.mount_ec_shards(vid, collection, &shard_ids) { + // mount_ec_shards adds shards one at a time and increments + // the per-shard metric for each. If it fails halfway, plain + // ec_volumes.remove(vid) would leak metric increments for + // the shards that did mount. Drive cleanup through + // unmount_ec_shards which mirror-decrements the metric, then + // the empty EcVolume drops itself. + if dat_exists { + warn!( + volume_id = vid.0, + "Failed to load EC shards and .dat exists ({}), cleaning up EC files to use .dat", + e, + ); + self.unmount_ec_shards(vid, &shard_ids); + self.remove_ec_volume_files(collection, vid); + } else { + warn!( + volume_id = vid.0, + "Failed to load EC shards: {} (this may be normal for distributed EC volumes)", + e, + ); + self.unmount_ec_shards(vid, &shard_ids); + } + } + } + + /// Mirrors `checkOrphanedShards`: if shards exist on disk without a + /// matching `.ecx` AND the legacy `.dat` is still present, treat + /// the encoding as interrupted before `.ecx` was written and clean + /// up. If `.dat` is gone, leave files alone — they may be + /// distributed-EC shards waiting for cross-disk reconciliation. + fn check_orphaned_shards( + &self, + shards: &[(String, u32)], + collection: &str, + vid: VolumeId, + ) -> bool { + if shards.is_empty() || vid.0 == 0 { + return false; + } + let base = volume_file_name(&self.directory, collection, vid); + let dat_path = format!("{}.dat", base); + if check_dat_file_exists(&dat_path) { + warn!( + volume_id = vid.0, + "Found {} EC shards without .ecx file (incomplete encoding interrupted before .ecx), cleaning up", + shards.len(), + ); + self.remove_ec_volume_files(collection, vid); + return true; + } + false + } + /// Total number of EC shards on this location. pub fn ec_shard_count(&self) -> usize { self.ec_volumes @@ -697,6 +883,55 @@ fn calculate_expected_shard_size(dat_file_size: i64) -> i64 { } /// Parse a volume filename like "collection_42.dat" or "42.dat" into (collection, VolumeId). +/// Parse a `_` or `` base name into its parts. +/// Mirrors `parseCollectionVolumeId` in +/// `weed/storage/disk_location.go`. Used when iterating raw filenames +/// where the extension has already been stripped. +fn parse_collection_volume_id(base: &str) -> Option<(String, VolumeId)> { + if let Some(pos) = base.rfind('_') { + let collection = &base[..pos]; + let id_str = &base[pos + 1..]; + let id: u32 = id_str.parse().ok()?; + Some((collection.to_string(), VolumeId(id))) + } else { + let id: u32 = base.parse().ok()?; + Some((String::new(), VolumeId(id))) + } +} + +/// Recognise EC shard extensions `.ec00`–`.ec255` (the ShardId u8 +/// range — see `EcVolumeShard`'s typed shard id) and return the +/// shard id. Returns `None` for any other extension. +/// +/// Matches the regex `\.ec\d{2,3}` from the Go side, with the same +/// per-shard `id <= 255` clamp Go's loader applies via +/// `strconv.ParseInt(... 10, 64)` followed by the `if shardId < 0 || +/// shardId > 255` guard. The 3-digit form (`.ec100`–`.ec255`) is +/// retained so the parser can still recognise shards from custom +/// 32+ ratios that fit in a u8 even though OSS only ships 10+4. +fn parse_ec_shard_extension(ext: &str) -> Option { + let rest = ext.strip_prefix(".ec")?; + if rest.len() < 2 || rest.len() > 3 { + return None; + } + let id: u32 = rest.parse().ok()?; + if id > 255 { + return None; + } + Some(id) +} + +/// Robust `.dat` existence check: any unexpected stat error (permission, +/// I/O) is treated as "exists" so we don't misclassify local EC as +/// distributed EC. Mirrors `checkDatFileExists` in Go. +fn check_dat_file_exists(path: &str) -> bool { + match fs::metadata(path) { + Ok(_) => true, + Err(e) if e.kind() == io::ErrorKind::NotFound => false, + Err(_) => true, + } +} + fn parse_volume_filename(filename: &str) -> Option<(String, VolumeId)> { let stem = filename .strip_suffix(".dat") @@ -982,4 +1217,152 @@ mod tests { .unwrap(); assert_eq!(reloaded.directory_uuid, directory_uuid); } + + /// Sanity: `parse_ec_shard_extension` accepts `.ec00` through + /// `.ec999` and rejects anything else. + #[test] + fn test_parse_ec_shard_extension() { + assert_eq!(parse_ec_shard_extension(".ec00"), Some(0)); + assert_eq!(parse_ec_shard_extension(".ec13"), Some(13)); + assert_eq!(parse_ec_shard_extension(".ec999"), None); // > 255 + assert_eq!(parse_ec_shard_extension(".ec255"), Some(255)); + assert_eq!(parse_ec_shard_extension(".ec0"), None); // need 2 digits + assert_eq!(parse_ec_shard_extension(".ecx"), None); + assert_eq!(parse_ec_shard_extension(".ecj"), None); + assert_eq!(parse_ec_shard_extension(".dat"), None); + assert_eq!(parse_ec_shard_extension("ec00"), None); // missing dot + } + + /// Auto-load: shards + .ecx on the same disk get mounted into + /// `ec_volumes` after `load_existing_volumes` finishes. Without + /// this, EC shards on disk are invisible until something issues + /// `VolumeEcShardsMount`. + #[test] + fn test_load_all_ec_shards_mounts_pairs_with_ecx() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + let collection = "grafana-loki"; + let vid = VolumeId(1234); + + // Two shards + .ecx + .ecj + .vif (the post-encode steady state). + for &sid in &[0u8, 4u8] { + let path = format!("{}/{}_{}.ec{:02}", dir, collection, vid.0, sid); + std::fs::write(&path, b"shard data nonempty").unwrap(); + } + std::fs::write(format!("{}/{}_{}.ecx", dir, collection, vid.0), vec![0u8; 20]) + .unwrap(); + std::fs::write(format!("{}/{}_{}.ecj", dir, collection, vid.0), b"").unwrap(); + std::fs::write( + format!("{}/{}_{}.vif", dir, collection, vid.0), + br#"{"version":3}"#, + ) + .unwrap(); + + let mut loc = DiskLocation::new( + dir, + dir, + 10, + DiskType::HardDrive, + MinFreeSpace::Percent(0.0), + Vec::new(), + ) + .unwrap(); + loc.load_existing_volumes(NeedleMapKind::InMemory).unwrap(); + + let ec_vol = loc.find_ec_volume(vid).expect("EcVolume should be mounted"); + assert_eq!(ec_vol.shard_count(), 2, "both shards should be loaded"); + } + + /// Orphan shards with no `.ecx` and no `.dat` must stay on disk + /// untouched (distributed-EC scenario, where the .ecx lives on + /// another server). Mirrors Go's `checkOrphanedShards` no-op + /// branch. + #[test] + fn test_load_all_ec_shards_keeps_orphan_shards_when_no_dat() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + let collection = "grafana-loki"; + let vid = VolumeId(2345); + + // Shards on disk, but no .ecx and no .dat — distributed-EC + // with the .ecx on a sibling/server. + let shard_paths: Vec = [0u8, 12u8] + .iter() + .map(|sid| format!("{}/{}_{}.ec{:02}", dir, collection, vid.0, sid)) + .collect(); + for p in &shard_paths { + std::fs::write(p, b"shard data nonempty").unwrap(); + } + + let mut loc = DiskLocation::new( + dir, + dir, + 10, + DiskType::HardDrive, + MinFreeSpace::Percent(0.0), + Vec::new(), + ) + .unwrap(); + loc.load_existing_volumes(NeedleMapKind::InMemory).unwrap(); + + // No EcVolume should be mounted (no .ecx). + assert!(loc.find_ec_volume(vid).is_none()); + // But the shard files must still be on disk for cross-disk + // reconciliation / operator recovery. + for p in &shard_paths { + assert!( + std::path::Path::new(p).exists(), + "orphan shard {} was destroyed", + p, + ); + } + } + + /// Orphan shards WITH a stale `.dat` are treated as an + /// interrupted-encoding remnant and cleaned up. Mirrors Go's + /// `checkOrphanedShards` cleanup branch. Guards a subtle case + /// where a partial encoding wrote shards before .ecx and a crash + /// left the .dat behind. + #[test] + fn test_load_all_ec_shards_cleans_orphan_shards_when_dat_exists() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + let collection = "grafana-loki"; + let vid = VolumeId(3456); + + let shard_paths: Vec = [0u8, 1u8] + .iter() + .map(|sid| format!("{}/{}_{}.ec{:02}", dir, collection, vid.0, sid)) + .collect(); + for p in &shard_paths { + std::fs::write(p, b"shard data nonempty").unwrap(); + } + // Stale .dat from interrupted encoding. + let dat_path = format!("{}/{}_{}.dat", dir, collection, vid.0); + std::fs::write(&dat_path, b"unfinished").unwrap(); + + let mut loc = DiskLocation::new( + dir, + dir, + 10, + DiskType::HardDrive, + MinFreeSpace::Percent(0.0), + Vec::new(), + ) + .unwrap(); + // load_existing_volumes processes .dat first; ec auto-load + // runs after and finds the orphans. The .dat itself may or may + // not be loadable as a regular volume — that's not what this + // test is about. Just check that the orphan shard files were + // removed. + let _ = loc.load_existing_volumes(NeedleMapKind::InMemory); + + for p in &shard_paths { + assert!( + !std::path::Path::new(p).exists(), + "orphan shard {} should have been cleaned up because .dat exists", + p, + ); + } + } }