diff --git a/crates/tranquil-store/src/gauntlet/leak.rs b/crates/tranquil-store/src/gauntlet/leak.rs new file mode 100644 index 0000000..981131b --- /dev/null +++ b/crates/tranquil-store/src/gauntlet/leak.rs @@ -0,0 +1,236 @@ +use std::num::NonZeroU64; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; + +use super::metrics::{MetricName, MetricsSample}; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct LeakGateConfig { + pub warmup_ms: u64, + pub window_ms: NonZeroU64, + pub growth_limit_pct: f64, +} + +#[derive(Debug, Clone, Copy)] +pub struct LeakGateBuildError(pub &'static str); + +impl std::fmt::Display for LeakGateBuildError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.0) + } +} + +impl std::error::Error for LeakGateBuildError {} + +impl LeakGateConfig { + pub fn try_new( + warmup_ms: u64, + window_ms: u64, + growth_limit_pct: f64, + ) -> Result { + let window_ms = NonZeroU64::new(window_ms) + .ok_or(LeakGateBuildError("leak gate window_ms must be > 0"))?; + if !growth_limit_pct.is_finite() || growth_limit_pct < 0.0 { + return Err(LeakGateBuildError( + "leak gate growth_limit_pct must be finite and non-negative", + )); + } + Ok(Self { + warmup_ms, + window_ms, + growth_limit_pct, + }) + } + + pub fn standard() -> Self { + Self::try_new(60 * 60 * 1_000, 4 * 60 * 60 * 1_000, 5.0) + .expect("standard leak gate config is valid") + } + + pub fn short_for_tests() -> Self { + Self::try_new(60_000, 4 * 60_000, 5.0).expect("short_for_tests leak gate config is valid") + } + + pub fn warmup(&self) -> Duration { + Duration::from_millis(self.warmup_ms) + } + + pub fn window(&self) -> Duration { + Duration::from_millis(self.window_ms.get()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LeakViolation { + pub metric: String, + pub start_ms: u64, + pub end_ms: u64, + pub start_value: u64, + pub end_value: u64, + pub growth_pct: f64, + pub limit_pct: f64, +} + +pub fn evaluate(samples: &[MetricsSample], cfg: LeakGateConfig) -> Vec { + if samples.len() < 2 { + return Vec::new(); + } + MetricName::ALL + .iter() + .flat_map(|&m| evaluate_metric(samples, m, cfg)) + .collect() +} + +fn evaluate_metric( + samples: &[MetricsSample], + metric: MetricName, + cfg: LeakGateConfig, +) -> Option { + let post_warmup: Vec<&MetricsSample> = samples + .iter() + .filter(|s| s.elapsed_ms >= cfg.warmup_ms) + .collect(); + if post_warmup.len() < 2 { + return None; + } + + let min_delta = metric.min_absolute_delta(); + let window = cfg.window_ms.get(); + let mut worst: Option = None; + for (i, start) in post_warmup.iter().enumerate() { + let Some(start_v) = start.metric(metric) else { + continue; + }; + if start_v == 0 { + continue; + } + let deadline = start.elapsed_ms.saturating_add(window); + for end in post_warmup.iter().skip(i + 1) { + if end.elapsed_ms > deadline { + break; + } + let Some(end_v) = end.metric(metric) else { + continue; + }; + if end_v <= start_v { + continue; + } + let delta = end_v - start_v; + if delta < min_delta { + continue; + } + let growth = (delta as f64 / start_v as f64) * 100.0; + if growth <= cfg.growth_limit_pct { + continue; + } + let candidate = LeakViolation { + metric: metric.as_str().to_string(), + start_ms: start.elapsed_ms, + end_ms: end.elapsed_ms, + start_value: start_v, + end_value: end_v, + growth_pct: growth, + limit_pct: cfg.growth_limit_pct, + }; + match &worst { + Some(w) if w.growth_pct >= candidate.growth_pct => {} + _ => worst = Some(candidate), + } + } + } + worst +} + +#[cfg(test)] +mod tests { + use super::*; + + const GIB: u64 = 1024 * 1024 * 1024; + + fn sample(elapsed_ms: u64, rss: u64) -> MetricsSample { + MetricsSample { + elapsed_ms, + rss_bytes: Some(rss), + fd_count: Some(10), + data_dir_bytes: 0, + index_dir_bytes: 0, + segments_dir_bytes: 0, + data_file_count: Some(0), + segment_count: Some(0), + block_index_entries: 0, + hint_file_bytes: 0, + } + } + + #[test] + fn flat_metrics_no_violation() { + let cfg = LeakGateConfig::short_for_tests(); + let series: Vec = (0..20) + .map(|i| sample(60_000 + i * 60_000, GIB)) + .collect(); + assert!(evaluate(&series, cfg).is_empty()); + } + + #[test] + fn growing_rss_flagged() { + let cfg = LeakGateConfig::short_for_tests(); + let series: Vec = (0..20) + .map(|i| sample(60_000 + i * 60_000, GIB + i * 64 * 1024 * 1024)) + .collect(); + let v = evaluate(&series, cfg); + assert!(!v.is_empty()); + assert_eq!(v[0].metric, "rss_bytes"); + assert!(v[0].growth_pct > 5.0); + } + + #[test] + fn warmup_samples_ignored() { + let cfg = LeakGateConfig::short_for_tests(); + let mut series: Vec = Vec::new(); + series.push(sample(10_000, 1)); + series.push(sample(30_000, GIB)); + (0..10).for_each(|i| { + series.push(sample(60_000 + i * 60_000, GIB)); + }); + assert!(evaluate(&series, cfg).is_empty()); + } + + #[test] + fn window_bound_honored() { + let cfg = LeakGateConfig::try_new(0, 2 * 60_000, 5.0).unwrap(); + let series = vec![sample(0, GIB), sample(200_000, 2 * GIB)]; + assert!( + evaluate(&series, cfg).is_empty(), + "200s gap exceeds 120s window, growth must not be flagged" + ); + } + + #[test] + fn small_absolute_delta_not_flagged() { + let cfg = LeakGateConfig::short_for_tests(); + let series: Vec = (0..10) + .map(|i| sample(60_000 + i * 60_000, GIB + i * 1024)) + .collect(); + assert!( + evaluate(&series, cfg).is_empty(), + "kilobyte growth is below the RSS absolute-delta floor" + ); + } + + #[test] + fn missing_metric_samples_skipped() { + let cfg = LeakGateConfig::short_for_tests(); + let mut series: Vec = (0..10) + .map(|i| sample(60_000 + i * 60_000, GIB)) + .collect(); + series[3].rss_bytes = None; + series[7].rss_bytes = None; + assert!(evaluate(&series, cfg).is_empty()); + } + + #[test] + fn zero_window_rejected_at_construction() { + assert!(LeakGateConfig::try_new(0, 0, 5.0).is_err()); + } +} diff --git a/crates/tranquil-store/src/gauntlet/metrics.rs b/crates/tranquil-store/src/gauntlet/metrics.rs new file mode 100644 index 0000000..4b58317 --- /dev/null +++ b/crates/tranquil-store/src/gauntlet/metrics.rs @@ -0,0 +1,247 @@ +use std::path::Path; +use std::sync::Arc; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use tracing::warn; + +use super::runner::{EventLogState, Harness}; +use crate::blockstore::TranquilBlockStore; +use crate::io::StorageIO; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct MetricsSample { + pub elapsed_ms: u64, + pub rss_bytes: Option, + pub fd_count: Option, + pub data_dir_bytes: u64, + pub index_dir_bytes: u64, + pub segments_dir_bytes: u64, + pub data_file_count: Option, + pub segment_count: Option, + pub block_index_entries: u64, + pub hint_file_bytes: u64, +} + +impl MetricsSample { + pub fn metric(&self, name: MetricName) -> Option { + match name { + MetricName::RssBytes => self.rss_bytes, + MetricName::FdCount => self.fd_count, + MetricName::DataDirBytes => Some(self.data_dir_bytes), + MetricName::IndexDirBytes => Some(self.index_dir_bytes), + MetricName::SegmentsDirBytes => Some(self.segments_dir_bytes), + MetricName::DataFileCount => self.data_file_count, + MetricName::SegmentCount => self.segment_count, + MetricName::BlockIndexEntries => Some(self.block_index_entries), + MetricName::HintFileBytes => Some(self.hint_file_bytes), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum MetricName { + RssBytes, + FdCount, + DataDirBytes, + IndexDirBytes, + SegmentsDirBytes, + DataFileCount, + SegmentCount, + BlockIndexEntries, + HintFileBytes, +} + +impl MetricName { + pub const ALL: &'static [MetricName] = &[ + Self::RssBytes, + Self::FdCount, + Self::DataDirBytes, + Self::IndexDirBytes, + Self::SegmentsDirBytes, + Self::DataFileCount, + Self::SegmentCount, + Self::BlockIndexEntries, + Self::HintFileBytes, + ]; + + pub const fn as_str(self) -> &'static str { + match self { + Self::RssBytes => "rss_bytes", + Self::FdCount => "fd_count", + Self::DataDirBytes => "data_dir_bytes", + Self::IndexDirBytes => "index_dir_bytes", + Self::SegmentsDirBytes => "segments_dir_bytes", + Self::DataFileCount => "data_file_count", + Self::SegmentCount => "segment_count", + Self::BlockIndexEntries => "block_index_entries", + Self::HintFileBytes => "hint_file_bytes", + } + } + + pub const fn min_absolute_delta(self) -> u64 { + match self { + Self::RssBytes => 16 * 1024 * 1024, + Self::FdCount => 16, + Self::DataDirBytes => 16 * 1024 * 1024, + Self::IndexDirBytes => 1024 * 1024, + Self::SegmentsDirBytes => 16 * 1024 * 1024, + Self::DataFileCount => 16, + Self::SegmentCount => 4, + Self::BlockIndexEntries => 1024, + Self::HintFileBytes => 1024 * 1024, + } + } +} + +pub fn sample_harness( + harness: &Harness, + elapsed: Duration, +) -> MetricsSample { + MetricsSample { + elapsed_ms: u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX), + rss_bytes: read_rss(), + fd_count: count_open_fds(), + data_dir_bytes: dir_bytes(harness.store.data_dir()), + index_dir_bytes: dir_bytes(harness.store.block_index().index_dir()), + segments_dir_bytes: harness + .eventlog + .as_ref() + .map(|el| dir_bytes(&el.segments_dir)) + .unwrap_or(0), + data_file_count: data_file_count(&harness.store), + segment_count: harness.eventlog.as_ref().and_then(segment_count), + block_index_entries: harness.store.block_index().approximate_block_count(), + hint_file_bytes: hint_bytes(harness.store.data_dir()), + } +} + +fn data_file_count( + store: &Arc>, +) -> Option { + match store.list_data_files() { + Ok(v) => Some(v.len() as u64), + Err(e) => { + warn!(error = %e, "gauntlet metrics: list_data_files failed"); + None + } + } +} + +fn segment_count(el: &EventLogState) -> Option { + match el.manager.list_segments() { + Ok(v) => Some(v.len() as u64), + Err(e) => { + warn!(error = %e, "gauntlet metrics: list_segments failed"); + None + } + } +} + +fn dir_bytes(path: &Path) -> u64 { + let Ok(entries) = std::fs::read_dir(path) else { + return 0; + }; + entries + .filter_map(Result::ok) + .map(|entry| match entry.file_type() { + Ok(ft) if ft.is_dir() => dir_bytes(&entry.path()), + Ok(_) => entry.metadata().map(|m| m.len()).unwrap_or(0), + Err(_) => 0, + }) + .sum() +} + +fn hint_bytes(data_dir: &Path) -> u64 { + let Ok(entries) = std::fs::read_dir(data_dir) else { + return 0; + }; + entries + .filter_map(Result::ok) + .filter(|entry| { + entry + .path() + .extension() + .and_then(|e| e.to_str()) + .map(|e| e == "tqh") + .unwrap_or(false) + }) + .map(|entry| entry.metadata().map(|m| m.len()).unwrap_or(0)) + .sum() +} + +#[cfg(target_os = "linux")] +fn read_rss() -> Option { + let status = match std::fs::read_to_string("/proc/self/status") { + Ok(s) => s, + Err(e) => { + warn!(error = %e, "gauntlet metrics: read /proc/self/status failed"); + return None; + } + }; + let parsed = status.lines().find_map(|line| { + let rest = line.strip_prefix("VmRSS:")?; + let kb: u64 = rest.split_whitespace().next()?.parse().ok()?; + Some(kb * 1024) + }); + if parsed.is_none() { + warn!("gauntlet metrics: VmRSS line missing from /proc/self/status"); + } + parsed +} + +#[cfg(not(target_os = "linux"))] +fn read_rss() -> Option { + None +} + +#[cfg(target_os = "linux")] +fn count_open_fds() -> Option { + match std::fs::read_dir("/proc/self/fd") { + Ok(entries) => Some(entries.filter_map(Result::ok).count() as u64), + Err(e) => { + warn!(error = %e, "gauntlet metrics: read /proc/self/fd failed"); + None + } + } +} + +#[cfg(not(target_os = "linux"))] +fn count_open_fds() -> Option { + None +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn metric_names_roundtrip_strings() { + MetricName::ALL.iter().for_each(|m| { + let s = m.as_str(); + assert!(!s.is_empty()); + }); + } + + #[test] + #[cfg_attr(not(target_os = "linux"), ignore = "linux /proc only")] + fn rss_reads_nonzero() { + let rss = read_rss().expect("rss"); + assert!(rss > 0, "rss should be positive, got {rss}"); + } + + #[test] + #[cfg_attr(not(target_os = "linux"), ignore = "linux /proc only")] + fn fd_count_reads_nonzero() { + let fd = count_open_fds().expect("fd"); + assert!(fd > 0); + } + + #[test] + fn dir_bytes_sums_entries() { + let dir = tempfile::TempDir::new().unwrap(); + std::fs::write(dir.path().join("a"), b"1234").unwrap(); + std::fs::write(dir.path().join("b"), b"5678").unwrap(); + assert_eq!(dir_bytes(dir.path()), 8); + } +}