diff --git a/seaweed-volume/src/storage/disk_location.rs b/seaweed-volume/src/storage/disk_location.rs index 47f36a0fb..b2b359c38 100644 --- a/seaweed-volume/src/storage/disk_location.rs +++ b/seaweed-volume/src/storage/disk_location.rs @@ -202,6 +202,15 @@ impl DiskLocation { } } + // After regular volumes, auto-discover EC shards on disk so a + // fresh restart picks up shards without an explicit + // VolumeEcShardsMount RPC. Mirrors Go's loadExistingVolumes + // calling loadAllEcShards. Failures here only log — they + // shouldn't fail the whole disk's startup. + if let Err(e) = self.load_all_ec_shards() { + warn!(directory = %self.directory, error = %e, "load_all_ec_shards failed"); + } + Ok(()) } @@ -625,6 +634,183 @@ impl DiskLocation { } } + /// Auto-discover .ec?? shard files on disk and mount them. + /// + /// Mirrors `loadAllEcShards` in `weed/storage/disk_location_ec.go`. + /// Reads the data dir (and idx dir, if separate), groups shards by + /// (collection, vid) using sorted iteration so a volume's shards + /// land contiguously next to their `.ecx`. When the matching `.ecx` + /// is encountered, validates and mounts the group. + /// + /// Without this, an EC shard on disk is invisible after a Rust + /// volume-server restart until something explicitly issues + /// VolumeEcShardsMount — strict superset of seaweedfs/seaweedfs#9212. + pub fn load_all_ec_shards(&mut self) -> io::Result<()> { + // Use a HashSet during accumulation so a name appearing in + // both data dir and idx dir (idempotent legacy layouts can + // place .ecx in either) is processed exactly once. Without + // dedup, mount_ec_shards' per-shard metric increment would + // double-count for those filenames. + let mut seen: HashSet = HashSet::new(); + let mut entries: Vec = Vec::new(); + for ent in fs::read_dir(&self.directory)? { + let ent = ent?; + if ent.file_type().map(|ft| ft.is_dir()).unwrap_or(false) { + continue; + } + let name = ent.file_name().to_string_lossy().into_owned(); + if seen.insert(name.clone()) { + entries.push(name); + } + } + if self.idx_directory != self.directory { + for ent in fs::read_dir(&self.idx_directory)? { + let ent = ent?; + if ent.file_type().map(|ft| ft.is_dir()).unwrap_or(false) { + continue; + } + let name = ent.file_name().to_string_lossy().into_owned(); + if seen.insert(name.clone()) { + entries.push(name); + } + } + } + entries.sort(); + + let mut same_volume_shards: Vec<(String, u32)> = Vec::new(); // (filename, shard_id) + let mut prev_vid: Option = None; + let mut prev_collection: String = String::new(); + + for name in entries { + let Some(dot) = name.rfind('.') else { + continue; + }; + let (base, ext) = name.split_at(dot); // ext includes leading '.' + let Some((collection, vid)) = parse_collection_volume_id(base) else { + continue; + }; + + if let Some(shard_id) = parse_ec_shard_extension(ext) { + // Ignore zero-byte shards — same as Go. + let path = format!("{}/{}", self.directory, name); + let size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0); + if size == 0 { + continue; + } + + let same_group = match prev_vid { + None => true, + Some(p) => p == vid && prev_collection == collection, + }; + if same_group { + same_volume_shards.push((name, shard_id)); + } else { + self.check_orphaned_shards( + &same_volume_shards, + &prev_collection, + prev_vid.unwrap_or(VolumeId(0)), + ); + same_volume_shards = vec![(name, shard_id)]; + } + prev_vid = Some(vid); + prev_collection = collection; + continue; + } + + if ext == ".ecx" + && prev_vid == Some(vid) + && prev_collection == collection + && !same_volume_shards.is_empty() + { + let shards = std::mem::take(&mut same_volume_shards); + self.handle_found_ecx_file(&shards, &collection, vid); + prev_vid = None; + prev_collection.clear(); + } + } + + // Handle any trailing group that never saw a matching .ecx. + self.check_orphaned_shards( + &same_volume_shards, + &prev_collection, + prev_vid.unwrap_or(VolumeId(0)), + ); + + Ok(()) + } + + /// Validate + mount a (collection, vid) group when its `.ecx` is + /// found. Mirrors `handleFoundEcxFile` in + /// `weed/storage/disk_location_ec.go`. + fn handle_found_ecx_file(&mut self, shards: &[(String, u32)], collection: &str, vid: VolumeId) { + let base = volume_file_name(&self.directory, collection, vid); + let dat_path = format!("{}.dat", base); + let dat_exists = check_dat_file_exists(&dat_path); + + if dat_exists && !self.validate_ec_volume(collection, vid) { + warn!( + volume_id = vid.0, + "Incomplete or invalid EC volume: .dat exists but validation failed, cleaning up EC files", + ); + self.remove_ec_volume_files(collection, vid); + return; + } + + let shard_ids: Vec = shards.iter().map(|(_, sid)| *sid).collect(); + if let Err(e) = self.mount_ec_shards(vid, collection, &shard_ids) { + // mount_ec_shards adds shards one at a time and increments + // the per-shard metric for each. If it fails halfway, plain + // ec_volumes.remove(vid) would leak metric increments for + // the shards that did mount. Drive cleanup through + // unmount_ec_shards which mirror-decrements the metric, then + // the empty EcVolume drops itself. + if dat_exists { + warn!( + volume_id = vid.0, + "Failed to load EC shards and .dat exists ({}), cleaning up EC files to use .dat", + e, + ); + self.unmount_ec_shards(vid, &shard_ids); + self.remove_ec_volume_files(collection, vid); + } else { + warn!( + volume_id = vid.0, + "Failed to load EC shards: {} (this may be normal for distributed EC volumes)", + e, + ); + self.unmount_ec_shards(vid, &shard_ids); + } + } + } + + /// Mirrors `checkOrphanedShards`: if shards exist on disk without a + /// matching `.ecx` AND the legacy `.dat` is still present, treat + /// the encoding as interrupted before `.ecx` was written and clean + /// up. If `.dat` is gone, leave files alone — they may be + /// distributed-EC shards waiting for cross-disk reconciliation. + fn check_orphaned_shards( + &self, + shards: &[(String, u32)], + collection: &str, + vid: VolumeId, + ) -> bool { + if shards.is_empty() || vid.0 == 0 { + return false; + } + let base = volume_file_name(&self.directory, collection, vid); + let dat_path = format!("{}.dat", base); + if check_dat_file_exists(&dat_path) { + warn!( + volume_id = vid.0, + "Found {} EC shards without .ecx file (incomplete encoding interrupted before .ecx), cleaning up", + shards.len(), + ); + self.remove_ec_volume_files(collection, vid); + return true; + } + false + } + /// Total number of EC shards on this location. pub fn ec_shard_count(&self) -> usize { self.ec_volumes @@ -697,6 +883,55 @@ fn calculate_expected_shard_size(dat_file_size: i64) -> i64 { } /// Parse a volume filename like "collection_42.dat" or "42.dat" into (collection, VolumeId). +/// Parse a `_` or `` base name into its parts. +/// Mirrors `parseCollectionVolumeId` in +/// `weed/storage/disk_location.go`. Used when iterating raw filenames +/// where the extension has already been stripped. +fn parse_collection_volume_id(base: &str) -> Option<(String, VolumeId)> { + if let Some(pos) = base.rfind('_') { + let collection = &base[..pos]; + let id_str = &base[pos + 1..]; + let id: u32 = id_str.parse().ok()?; + Some((collection.to_string(), VolumeId(id))) + } else { + let id: u32 = base.parse().ok()?; + Some((String::new(), VolumeId(id))) + } +} + +/// Recognise EC shard extensions `.ec00`–`.ec255` (the ShardId u8 +/// range — see `EcVolumeShard`'s typed shard id) and return the +/// shard id. Returns `None` for any other extension. +/// +/// Matches the regex `\.ec\d{2,3}` from the Go side, with the same +/// per-shard `id <= 255` clamp Go's loader applies via +/// `strconv.ParseInt(... 10, 64)` followed by the `if shardId < 0 || +/// shardId > 255` guard. The 3-digit form (`.ec100`–`.ec255`) is +/// retained so the parser can still recognise shards from custom +/// 32+ ratios that fit in a u8 even though OSS only ships 10+4. +fn parse_ec_shard_extension(ext: &str) -> Option { + let rest = ext.strip_prefix(".ec")?; + if rest.len() < 2 || rest.len() > 3 { + return None; + } + let id: u32 = rest.parse().ok()?; + if id > 255 { + return None; + } + Some(id) +} + +/// Robust `.dat` existence check: any unexpected stat error (permission, +/// I/O) is treated as "exists" so we don't misclassify local EC as +/// distributed EC. Mirrors `checkDatFileExists` in Go. +fn check_dat_file_exists(path: &str) -> bool { + match fs::metadata(path) { + Ok(_) => true, + Err(e) if e.kind() == io::ErrorKind::NotFound => false, + Err(_) => true, + } +} + fn parse_volume_filename(filename: &str) -> Option<(String, VolumeId)> { let stem = filename .strip_suffix(".dat") @@ -982,4 +1217,152 @@ mod tests { .unwrap(); assert_eq!(reloaded.directory_uuid, directory_uuid); } + + /// Sanity: `parse_ec_shard_extension` accepts `.ec00` through + /// `.ec999` and rejects anything else. + #[test] + fn test_parse_ec_shard_extension() { + assert_eq!(parse_ec_shard_extension(".ec00"), Some(0)); + assert_eq!(parse_ec_shard_extension(".ec13"), Some(13)); + assert_eq!(parse_ec_shard_extension(".ec999"), None); // > 255 + assert_eq!(parse_ec_shard_extension(".ec255"), Some(255)); + assert_eq!(parse_ec_shard_extension(".ec0"), None); // need 2 digits + assert_eq!(parse_ec_shard_extension(".ecx"), None); + assert_eq!(parse_ec_shard_extension(".ecj"), None); + assert_eq!(parse_ec_shard_extension(".dat"), None); + assert_eq!(parse_ec_shard_extension("ec00"), None); // missing dot + } + + /// Auto-load: shards + .ecx on the same disk get mounted into + /// `ec_volumes` after `load_existing_volumes` finishes. Without + /// this, EC shards on disk are invisible until something issues + /// `VolumeEcShardsMount`. + #[test] + fn test_load_all_ec_shards_mounts_pairs_with_ecx() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + let collection = "grafana-loki"; + let vid = VolumeId(1234); + + // Two shards + .ecx + .ecj + .vif (the post-encode steady state). + for &sid in &[0u8, 4u8] { + let path = format!("{}/{}_{}.ec{:02}", dir, collection, vid.0, sid); + std::fs::write(&path, b"shard data nonempty").unwrap(); + } + std::fs::write(format!("{}/{}_{}.ecx", dir, collection, vid.0), vec![0u8; 20]) + .unwrap(); + std::fs::write(format!("{}/{}_{}.ecj", dir, collection, vid.0), b"").unwrap(); + std::fs::write( + format!("{}/{}_{}.vif", dir, collection, vid.0), + br#"{"version":3}"#, + ) + .unwrap(); + + let mut loc = DiskLocation::new( + dir, + dir, + 10, + DiskType::HardDrive, + MinFreeSpace::Percent(0.0), + Vec::new(), + ) + .unwrap(); + loc.load_existing_volumes(NeedleMapKind::InMemory).unwrap(); + + let ec_vol = loc.find_ec_volume(vid).expect("EcVolume should be mounted"); + assert_eq!(ec_vol.shard_count(), 2, "both shards should be loaded"); + } + + /// Orphan shards with no `.ecx` and no `.dat` must stay on disk + /// untouched (distributed-EC scenario, where the .ecx lives on + /// another server). Mirrors Go's `checkOrphanedShards` no-op + /// branch. + #[test] + fn test_load_all_ec_shards_keeps_orphan_shards_when_no_dat() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + let collection = "grafana-loki"; + let vid = VolumeId(2345); + + // Shards on disk, but no .ecx and no .dat — distributed-EC + // with the .ecx on a sibling/server. + let shard_paths: Vec = [0u8, 12u8] + .iter() + .map(|sid| format!("{}/{}_{}.ec{:02}", dir, collection, vid.0, sid)) + .collect(); + for p in &shard_paths { + std::fs::write(p, b"shard data nonempty").unwrap(); + } + + let mut loc = DiskLocation::new( + dir, + dir, + 10, + DiskType::HardDrive, + MinFreeSpace::Percent(0.0), + Vec::new(), + ) + .unwrap(); + loc.load_existing_volumes(NeedleMapKind::InMemory).unwrap(); + + // No EcVolume should be mounted (no .ecx). + assert!(loc.find_ec_volume(vid).is_none()); + // But the shard files must still be on disk for cross-disk + // reconciliation / operator recovery. + for p in &shard_paths { + assert!( + std::path::Path::new(p).exists(), + "orphan shard {} was destroyed", + p, + ); + } + } + + /// Orphan shards WITH a stale `.dat` are treated as an + /// interrupted-encoding remnant and cleaned up. Mirrors Go's + /// `checkOrphanedShards` cleanup branch. Guards a subtle case + /// where a partial encoding wrote shards before .ecx and a crash + /// left the .dat behind. + #[test] + fn test_load_all_ec_shards_cleans_orphan_shards_when_dat_exists() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + let collection = "grafana-loki"; + let vid = VolumeId(3456); + + let shard_paths: Vec = [0u8, 1u8] + .iter() + .map(|sid| format!("{}/{}_{}.ec{:02}", dir, collection, vid.0, sid)) + .collect(); + for p in &shard_paths { + std::fs::write(p, b"shard data nonempty").unwrap(); + } + // Stale .dat from interrupted encoding. + let dat_path = format!("{}/{}_{}.dat", dir, collection, vid.0); + std::fs::write(&dat_path, b"unfinished").unwrap(); + + let mut loc = DiskLocation::new( + dir, + dir, + 10, + DiskType::HardDrive, + MinFreeSpace::Percent(0.0), + Vec::new(), + ) + .unwrap(); + // load_existing_volumes processes .dat first; ec auto-load + // runs after and finds the orphans. The .dat itself may or may + // not be loadable as a regular volume — that's not what this + // test is about. Just check that the orphan shard files were + // removed. + let _ = loc.load_existing_volumes(NeedleMapKind::InMemory); + + for p in &shard_paths { + assert!( + !std::path::Path::new(p).exists(), + "orphan shard {} should have been cleaned up because .dat exists", + p, + ); + } + } }