feat(seaweed-volume): auto-load EC shards on startup (#9212) (#9251)

* feat(seaweed-volume): auto-load EC shards on startup

The Rust volume server's load_existing_volumes only scanned .dat
files; EC shards on disk stayed invisible until something explicitly
issued VolumeEcShardsMount. Strict superset of the issue
seaweedfs/seaweedfs#9212 reports for Go: after a fresh restart, every
local EC shard was missing from the master's view.

Port loadAllEcShards from weed/storage/disk_location_ec.go:

- DiskLocation::load_all_ec_shards walks Directory (and IdxDirectory
  if separate) sorted, groups .ec?? shard files by (collection, vid),
  validates and mounts each group when its matching .ecx is found.
- handle_found_ecx_file: validate_ec_volume + mount_ec_shards path,
  with cleanup when .dat exists and validation fails (incomplete
  encoding) or load fails.
- check_orphaned_shards: cleans up shard remnants whose .ecx never
  arrived AND whose stale .dat is still present (interrupted
  encoding); leaves them on disk otherwise so cross-disk
  reconciliation / operator recovery can find them.
- check_dat_file_exists / parse_collection_volume_id /
  parse_ec_shard_extension: small helpers mirroring Go's checkDatFileExists,
  parseCollectionVolumeId, and the `\.ec\d{2,3}` regex.
- Wire through load_existing_volumes after the .dat scan; failures
  log but don't fail the disk's startup.

Tests:
- test_parse_ec_shard_extension covers .ec00–.ec255 and the rejection
  of .ec0, .ec999, .ecx, .ecj, .dat, and missing leading dot.
- test_load_all_ec_shards_mounts_pairs_with_ecx: shards + .ecx + .vif
  on disk get mounted into ec_volumes after load_existing_volumes.
- test_load_all_ec_shards_keeps_orphan_shards_when_no_dat: orphan
  shards (no .ecx, no .dat) stay on disk untouched
  (distributed-EC scenario).
- test_load_all_ec_shards_cleans_orphan_shards_when_dat_exists:
  orphan shards alongside a stale .dat get cleaned up
  (interrupted-encoding scenario).

Prerequisite for porting the cross-disk orphan-shard reconciliation
in seaweedfs/seaweedfs#9244 to Rust.

* fix(seaweed-volume): dedupe filenames when scanning data + idx dirs

load_all_ec_shards scans both `directory` and `idx_directory` (when
they differ) so the loop can pair `.ec??` shards with their `.ecx`
regardless of which dir owns the index. If the same filename is
present in both — possible in idempotent legacy layouts that
pre-date `-dir.idx` — the previous implementation processed it
twice. mount_ec_shards increments the per-shard `ec_shards` metric
inside the loop, so a duplicated `.ec??` entry would double-count
the gauge.

Use a HashSet<String> while accumulating entries so each filename
is processed exactly once.

Reported in PR #9251 review by @gemini-code-assist.

* fix(seaweed-volume): drive partial-mount cleanup through unmount_ec_shards

handle_found_ecx_file calls mount_ec_shards which adds shards one at
a time. mount_ec_shards increments the `ec_shards` gauge per shard
that successfully attaches. If mount fails halfway, plain
ec_volumes.remove(vid) drops the EcVolume but leaves the gauge
incremented for whatever did mount.

Drive the cleanup branches through unmount_ec_shards instead — it
mirror-decrements the gauge per shard and only then drops the
EcVolume. Same shape applied to both .dat-exists and distributed-EC
fallbacks.

Reported in PR #9251 review by @gemini-code-assist.

* docs(seaweed-volume): clarify parse_ec_shard_extension shard-id range

Doc previously said `.ec00`–`.ec999` but the implementation rejects
any shard id > 255 (matches the `EcVolumeShard` u8 typed shard id
and Go's `strconv.ParseInt(... 10, 64)` + `> 255` guard). Fix the
doc to say `.ec00`–`.ec255` and explain why the 3-digit form is
still recognised.

Reported in PR #9251 review by @coderabbitai.
This commit is contained in:
Chris Lu
2026-04-27 16:41:46 -07:00
committed by GitHub
parent 933ae6e386
commit 49e83a26cb

View File

@@ -202,6 +202,15 @@ impl DiskLocation {
}
}
// After regular volumes, auto-discover EC shards on disk so a
// fresh restart picks up shards without an explicit
// VolumeEcShardsMount RPC. Mirrors Go's loadExistingVolumes
// calling loadAllEcShards. Failures here only log — they
// shouldn't fail the whole disk's startup.
if let Err(e) = self.load_all_ec_shards() {
warn!(directory = %self.directory, error = %e, "load_all_ec_shards failed");
}
Ok(())
}
@@ -625,6 +634,183 @@ impl DiskLocation {
}
}
/// Auto-discover .ec?? shard files on disk and mount them.
///
/// Mirrors `loadAllEcShards` in `weed/storage/disk_location_ec.go`.
/// Reads the data dir (and idx dir, if separate), groups shards by
/// (collection, vid) using sorted iteration so a volume's shards
/// land contiguously next to their `.ecx`. When the matching `.ecx`
/// is encountered, validates and mounts the group.
///
/// Without this, an EC shard on disk is invisible after a Rust
/// volume-server restart until something explicitly issues
/// VolumeEcShardsMount — strict superset of seaweedfs/seaweedfs#9212.
pub fn load_all_ec_shards(&mut self) -> io::Result<()> {
// Use a HashSet during accumulation so a name appearing in
// both data dir and idx dir (idempotent legacy layouts can
// place .ecx in either) is processed exactly once. Without
// dedup, mount_ec_shards' per-shard metric increment would
// double-count for those filenames.
let mut seen: HashSet<String> = HashSet::new();
let mut entries: Vec<String> = Vec::new();
for ent in fs::read_dir(&self.directory)? {
let ent = ent?;
if ent.file_type().map(|ft| ft.is_dir()).unwrap_or(false) {
continue;
}
let name = ent.file_name().to_string_lossy().into_owned();
if seen.insert(name.clone()) {
entries.push(name);
}
}
if self.idx_directory != self.directory {
for ent in fs::read_dir(&self.idx_directory)? {
let ent = ent?;
if ent.file_type().map(|ft| ft.is_dir()).unwrap_or(false) {
continue;
}
let name = ent.file_name().to_string_lossy().into_owned();
if seen.insert(name.clone()) {
entries.push(name);
}
}
}
entries.sort();
let mut same_volume_shards: Vec<(String, u32)> = Vec::new(); // (filename, shard_id)
let mut prev_vid: Option<VolumeId> = None;
let mut prev_collection: String = String::new();
for name in entries {
let Some(dot) = name.rfind('.') else {
continue;
};
let (base, ext) = name.split_at(dot); // ext includes leading '.'
let Some((collection, vid)) = parse_collection_volume_id(base) else {
continue;
};
if let Some(shard_id) = parse_ec_shard_extension(ext) {
// Ignore zero-byte shards — same as Go.
let path = format!("{}/{}", self.directory, name);
let size = fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
if size == 0 {
continue;
}
let same_group = match prev_vid {
None => true,
Some(p) => p == vid && prev_collection == collection,
};
if same_group {
same_volume_shards.push((name, shard_id));
} else {
self.check_orphaned_shards(
&same_volume_shards,
&prev_collection,
prev_vid.unwrap_or(VolumeId(0)),
);
same_volume_shards = vec![(name, shard_id)];
}
prev_vid = Some(vid);
prev_collection = collection;
continue;
}
if ext == ".ecx"
&& prev_vid == Some(vid)
&& prev_collection == collection
&& !same_volume_shards.is_empty()
{
let shards = std::mem::take(&mut same_volume_shards);
self.handle_found_ecx_file(&shards, &collection, vid);
prev_vid = None;
prev_collection.clear();
}
}
// Handle any trailing group that never saw a matching .ecx.
self.check_orphaned_shards(
&same_volume_shards,
&prev_collection,
prev_vid.unwrap_or(VolumeId(0)),
);
Ok(())
}
/// Validate + mount a (collection, vid) group when its `.ecx` is
/// found. Mirrors `handleFoundEcxFile` in
/// `weed/storage/disk_location_ec.go`.
fn handle_found_ecx_file(&mut self, shards: &[(String, u32)], collection: &str, vid: VolumeId) {
let base = volume_file_name(&self.directory, collection, vid);
let dat_path = format!("{}.dat", base);
let dat_exists = check_dat_file_exists(&dat_path);
if dat_exists && !self.validate_ec_volume(collection, vid) {
warn!(
volume_id = vid.0,
"Incomplete or invalid EC volume: .dat exists but validation failed, cleaning up EC files",
);
self.remove_ec_volume_files(collection, vid);
return;
}
let shard_ids: Vec<u32> = shards.iter().map(|(_, sid)| *sid).collect();
if let Err(e) = self.mount_ec_shards(vid, collection, &shard_ids) {
// mount_ec_shards adds shards one at a time and increments
// the per-shard metric for each. If it fails halfway, plain
// ec_volumes.remove(vid) would leak metric increments for
// the shards that did mount. Drive cleanup through
// unmount_ec_shards which mirror-decrements the metric, then
// the empty EcVolume drops itself.
if dat_exists {
warn!(
volume_id = vid.0,
"Failed to load EC shards and .dat exists ({}), cleaning up EC files to use .dat",
e,
);
self.unmount_ec_shards(vid, &shard_ids);
self.remove_ec_volume_files(collection, vid);
} else {
warn!(
volume_id = vid.0,
"Failed to load EC shards: {} (this may be normal for distributed EC volumes)",
e,
);
self.unmount_ec_shards(vid, &shard_ids);
}
}
}
/// Mirrors `checkOrphanedShards`: if shards exist on disk without a
/// matching `.ecx` AND the legacy `.dat` is still present, treat
/// the encoding as interrupted before `.ecx` was written and clean
/// up. If `.dat` is gone, leave files alone — they may be
/// distributed-EC shards waiting for cross-disk reconciliation.
fn check_orphaned_shards(
&self,
shards: &[(String, u32)],
collection: &str,
vid: VolumeId,
) -> bool {
if shards.is_empty() || vid.0 == 0 {
return false;
}
let base = volume_file_name(&self.directory, collection, vid);
let dat_path = format!("{}.dat", base);
if check_dat_file_exists(&dat_path) {
warn!(
volume_id = vid.0,
"Found {} EC shards without .ecx file (incomplete encoding interrupted before .ecx), cleaning up",
shards.len(),
);
self.remove_ec_volume_files(collection, vid);
return true;
}
false
}
/// Total number of EC shards on this location.
pub fn ec_shard_count(&self) -> usize {
self.ec_volumes
@@ -697,6 +883,55 @@ fn calculate_expected_shard_size(dat_file_size: i64) -> i64 {
}
/// Parse a volume filename like "collection_42.dat" or "42.dat" into (collection, VolumeId).
/// Parse a `<collection>_<vid>` or `<vid>` base name into its parts.
/// Mirrors `parseCollectionVolumeId` in
/// `weed/storage/disk_location.go`. Used when iterating raw filenames
/// where the extension has already been stripped.
fn parse_collection_volume_id(base: &str) -> Option<(String, VolumeId)> {
if let Some(pos) = base.rfind('_') {
let collection = &base[..pos];
let id_str = &base[pos + 1..];
let id: u32 = id_str.parse().ok()?;
Some((collection.to_string(), VolumeId(id)))
} else {
let id: u32 = base.parse().ok()?;
Some((String::new(), VolumeId(id)))
}
}
/// Recognise EC shard extensions `.ec00``.ec255` (the ShardId u8
/// range — see `EcVolumeShard`'s typed shard id) and return the
/// shard id. Returns `None` for any other extension.
///
/// Matches the regex `\.ec\d{2,3}` from the Go side, with the same
/// per-shard `id <= 255` clamp Go's loader applies via
/// `strconv.ParseInt(... 10, 64)` followed by the `if shardId < 0 ||
/// shardId > 255` guard. The 3-digit form (`.ec100``.ec255`) is
/// retained so the parser can still recognise shards from custom
/// 32+ ratios that fit in a u8 even though OSS only ships 10+4.
fn parse_ec_shard_extension(ext: &str) -> Option<u32> {
let rest = ext.strip_prefix(".ec")?;
if rest.len() < 2 || rest.len() > 3 {
return None;
}
let id: u32 = rest.parse().ok()?;
if id > 255 {
return None;
}
Some(id)
}
/// Robust `.dat` existence check: any unexpected stat error (permission,
/// I/O) is treated as "exists" so we don't misclassify local EC as
/// distributed EC. Mirrors `checkDatFileExists` in Go.
fn check_dat_file_exists(path: &str) -> bool {
match fs::metadata(path) {
Ok(_) => true,
Err(e) if e.kind() == io::ErrorKind::NotFound => false,
Err(_) => true,
}
}
fn parse_volume_filename(filename: &str) -> Option<(String, VolumeId)> {
let stem = filename
.strip_suffix(".dat")
@@ -982,4 +1217,152 @@ mod tests {
.unwrap();
assert_eq!(reloaded.directory_uuid, directory_uuid);
}
/// Sanity: `parse_ec_shard_extension` accepts `.ec00` through
/// `.ec999` and rejects anything else.
#[test]
fn test_parse_ec_shard_extension() {
assert_eq!(parse_ec_shard_extension(".ec00"), Some(0));
assert_eq!(parse_ec_shard_extension(".ec13"), Some(13));
assert_eq!(parse_ec_shard_extension(".ec999"), None); // > 255
assert_eq!(parse_ec_shard_extension(".ec255"), Some(255));
assert_eq!(parse_ec_shard_extension(".ec0"), None); // need 2 digits
assert_eq!(parse_ec_shard_extension(".ecx"), None);
assert_eq!(parse_ec_shard_extension(".ecj"), None);
assert_eq!(parse_ec_shard_extension(".dat"), None);
assert_eq!(parse_ec_shard_extension("ec00"), None); // missing dot
}
/// Auto-load: shards + .ecx on the same disk get mounted into
/// `ec_volumes` after `load_existing_volumes` finishes. Without
/// this, EC shards on disk are invisible until something issues
/// `VolumeEcShardsMount`.
#[test]
fn test_load_all_ec_shards_mounts_pairs_with_ecx() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let collection = "grafana-loki";
let vid = VolumeId(1234);
// Two shards + .ecx + .ecj + .vif (the post-encode steady state).
for &sid in &[0u8, 4u8] {
let path = format!("{}/{}_{}.ec{:02}", dir, collection, vid.0, sid);
std::fs::write(&path, b"shard data nonempty").unwrap();
}
std::fs::write(format!("{}/{}_{}.ecx", dir, collection, vid.0), vec![0u8; 20])
.unwrap();
std::fs::write(format!("{}/{}_{}.ecj", dir, collection, vid.0), b"").unwrap();
std::fs::write(
format!("{}/{}_{}.vif", dir, collection, vid.0),
br#"{"version":3}"#,
)
.unwrap();
let mut loc = DiskLocation::new(
dir,
dir,
10,
DiskType::HardDrive,
MinFreeSpace::Percent(0.0),
Vec::new(),
)
.unwrap();
loc.load_existing_volumes(NeedleMapKind::InMemory).unwrap();
let ec_vol = loc.find_ec_volume(vid).expect("EcVolume should be mounted");
assert_eq!(ec_vol.shard_count(), 2, "both shards should be loaded");
}
/// Orphan shards with no `.ecx` and no `.dat` must stay on disk
/// untouched (distributed-EC scenario, where the .ecx lives on
/// another server). Mirrors Go's `checkOrphanedShards` no-op
/// branch.
#[test]
fn test_load_all_ec_shards_keeps_orphan_shards_when_no_dat() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let collection = "grafana-loki";
let vid = VolumeId(2345);
// Shards on disk, but no .ecx and no .dat — distributed-EC
// with the .ecx on a sibling/server.
let shard_paths: Vec<String> = [0u8, 12u8]
.iter()
.map(|sid| format!("{}/{}_{}.ec{:02}", dir, collection, vid.0, sid))
.collect();
for p in &shard_paths {
std::fs::write(p, b"shard data nonempty").unwrap();
}
let mut loc = DiskLocation::new(
dir,
dir,
10,
DiskType::HardDrive,
MinFreeSpace::Percent(0.0),
Vec::new(),
)
.unwrap();
loc.load_existing_volumes(NeedleMapKind::InMemory).unwrap();
// No EcVolume should be mounted (no .ecx).
assert!(loc.find_ec_volume(vid).is_none());
// But the shard files must still be on disk for cross-disk
// reconciliation / operator recovery.
for p in &shard_paths {
assert!(
std::path::Path::new(p).exists(),
"orphan shard {} was destroyed",
p,
);
}
}
/// Orphan shards WITH a stale `.dat` are treated as an
/// interrupted-encoding remnant and cleaned up. Mirrors Go's
/// `checkOrphanedShards` cleanup branch. Guards a subtle case
/// where a partial encoding wrote shards before .ecx and a crash
/// left the .dat behind.
#[test]
fn test_load_all_ec_shards_cleans_orphan_shards_when_dat_exists() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let collection = "grafana-loki";
let vid = VolumeId(3456);
let shard_paths: Vec<String> = [0u8, 1u8]
.iter()
.map(|sid| format!("{}/{}_{}.ec{:02}", dir, collection, vid.0, sid))
.collect();
for p in &shard_paths {
std::fs::write(p, b"shard data nonempty").unwrap();
}
// Stale .dat from interrupted encoding.
let dat_path = format!("{}/{}_{}.dat", dir, collection, vid.0);
std::fs::write(&dat_path, b"unfinished").unwrap();
let mut loc = DiskLocation::new(
dir,
dir,
10,
DiskType::HardDrive,
MinFreeSpace::Percent(0.0),
Vec::new(),
)
.unwrap();
// load_existing_volumes processes .dat first; ec auto-load
// runs after and finds the orphans. The .dat itself may or may
// not be loadable as a regular volume — that's not what this
// test is about. Just check that the orphan shard files were
// removed.
let _ = loc.load_existing_volumes(NeedleMapKind::InMemory);
for p in &shard_paths {
assert!(
!std::path::Path::new(p).exists(),
"orphan shard {} should have been cleaned up because .dat exists",
p,
);
}
}
}