feat(tranquil-store): leak gate and metrics sampling for gauntlet

Lewis: May this revision serve well! <lu5a@proton.me>
This commit is contained in:
Lewis
2026-04-22 21:00:17 +03:00
parent 4fe01cff72
commit 98b94fb170
2 changed files with 483 additions and 0 deletions

View File

@@ -0,0 +1,236 @@
use std::num::NonZeroU64;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use super::metrics::{MetricName, MetricsSample};
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct LeakGateConfig {
pub warmup_ms: u64,
pub window_ms: NonZeroU64,
pub growth_limit_pct: f64,
}
#[derive(Debug, Clone, Copy)]
pub struct LeakGateBuildError(pub &'static str);
impl std::fmt::Display for LeakGateBuildError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.0)
}
}
impl std::error::Error for LeakGateBuildError {}
impl LeakGateConfig {
pub fn try_new(
warmup_ms: u64,
window_ms: u64,
growth_limit_pct: f64,
) -> Result<Self, LeakGateBuildError> {
let window_ms = NonZeroU64::new(window_ms)
.ok_or(LeakGateBuildError("leak gate window_ms must be > 0"))?;
if !growth_limit_pct.is_finite() || growth_limit_pct < 0.0 {
return Err(LeakGateBuildError(
"leak gate growth_limit_pct must be finite and non-negative",
));
}
Ok(Self {
warmup_ms,
window_ms,
growth_limit_pct,
})
}
pub fn standard() -> Self {
Self::try_new(60 * 60 * 1_000, 4 * 60 * 60 * 1_000, 5.0)
.expect("standard leak gate config is valid")
}
pub fn short_for_tests() -> Self {
Self::try_new(60_000, 4 * 60_000, 5.0).expect("short_for_tests leak gate config is valid")
}
pub fn warmup(&self) -> Duration {
Duration::from_millis(self.warmup_ms)
}
pub fn window(&self) -> Duration {
Duration::from_millis(self.window_ms.get())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LeakViolation {
pub metric: String,
pub start_ms: u64,
pub end_ms: u64,
pub start_value: u64,
pub end_value: u64,
pub growth_pct: f64,
pub limit_pct: f64,
}
pub fn evaluate(samples: &[MetricsSample], cfg: LeakGateConfig) -> Vec<LeakViolation> {
if samples.len() < 2 {
return Vec::new();
}
MetricName::ALL
.iter()
.flat_map(|&m| evaluate_metric(samples, m, cfg))
.collect()
}
fn evaluate_metric(
samples: &[MetricsSample],
metric: MetricName,
cfg: LeakGateConfig,
) -> Option<LeakViolation> {
let post_warmup: Vec<&MetricsSample> = samples
.iter()
.filter(|s| s.elapsed_ms >= cfg.warmup_ms)
.collect();
if post_warmup.len() < 2 {
return None;
}
let min_delta = metric.min_absolute_delta();
let window = cfg.window_ms.get();
let mut worst: Option<LeakViolation> = None;
for (i, start) in post_warmup.iter().enumerate() {
let Some(start_v) = start.metric(metric) else {
continue;
};
if start_v == 0 {
continue;
}
let deadline = start.elapsed_ms.saturating_add(window);
for end in post_warmup.iter().skip(i + 1) {
if end.elapsed_ms > deadline {
break;
}
let Some(end_v) = end.metric(metric) else {
continue;
};
if end_v <= start_v {
continue;
}
let delta = end_v - start_v;
if delta < min_delta {
continue;
}
let growth = (delta as f64 / start_v as f64) * 100.0;
if growth <= cfg.growth_limit_pct {
continue;
}
let candidate = LeakViolation {
metric: metric.as_str().to_string(),
start_ms: start.elapsed_ms,
end_ms: end.elapsed_ms,
start_value: start_v,
end_value: end_v,
growth_pct: growth,
limit_pct: cfg.growth_limit_pct,
};
match &worst {
Some(w) if w.growth_pct >= candidate.growth_pct => {}
_ => worst = Some(candidate),
}
}
}
worst
}
#[cfg(test)]
mod tests {
use super::*;
const GIB: u64 = 1024 * 1024 * 1024;
fn sample(elapsed_ms: u64, rss: u64) -> MetricsSample {
MetricsSample {
elapsed_ms,
rss_bytes: Some(rss),
fd_count: Some(10),
data_dir_bytes: 0,
index_dir_bytes: 0,
segments_dir_bytes: 0,
data_file_count: Some(0),
segment_count: Some(0),
block_index_entries: 0,
hint_file_bytes: 0,
}
}
#[test]
fn flat_metrics_no_violation() {
let cfg = LeakGateConfig::short_for_tests();
let series: Vec<MetricsSample> = (0..20)
.map(|i| sample(60_000 + i * 60_000, GIB))
.collect();
assert!(evaluate(&series, cfg).is_empty());
}
#[test]
fn growing_rss_flagged() {
let cfg = LeakGateConfig::short_for_tests();
let series: Vec<MetricsSample> = (0..20)
.map(|i| sample(60_000 + i * 60_000, GIB + i * 64 * 1024 * 1024))
.collect();
let v = evaluate(&series, cfg);
assert!(!v.is_empty());
assert_eq!(v[0].metric, "rss_bytes");
assert!(v[0].growth_pct > 5.0);
}
#[test]
fn warmup_samples_ignored() {
let cfg = LeakGateConfig::short_for_tests();
let mut series: Vec<MetricsSample> = Vec::new();
series.push(sample(10_000, 1));
series.push(sample(30_000, GIB));
(0..10).for_each(|i| {
series.push(sample(60_000 + i * 60_000, GIB));
});
assert!(evaluate(&series, cfg).is_empty());
}
#[test]
fn window_bound_honored() {
let cfg = LeakGateConfig::try_new(0, 2 * 60_000, 5.0).unwrap();
let series = vec![sample(0, GIB), sample(200_000, 2 * GIB)];
assert!(
evaluate(&series, cfg).is_empty(),
"200s gap exceeds 120s window, growth must not be flagged"
);
}
#[test]
fn small_absolute_delta_not_flagged() {
let cfg = LeakGateConfig::short_for_tests();
let series: Vec<MetricsSample> = (0..10)
.map(|i| sample(60_000 + i * 60_000, GIB + i * 1024))
.collect();
assert!(
evaluate(&series, cfg).is_empty(),
"kilobyte growth is below the RSS absolute-delta floor"
);
}
#[test]
fn missing_metric_samples_skipped() {
let cfg = LeakGateConfig::short_for_tests();
let mut series: Vec<MetricsSample> = (0..10)
.map(|i| sample(60_000 + i * 60_000, GIB))
.collect();
series[3].rss_bytes = None;
series[7].rss_bytes = None;
assert!(evaluate(&series, cfg).is_empty());
}
#[test]
fn zero_window_rejected_at_construction() {
assert!(LeakGateConfig::try_new(0, 0, 5.0).is_err());
}
}

View File

@@ -0,0 +1,247 @@
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use tracing::warn;
use super::runner::{EventLogState, Harness};
use crate::blockstore::TranquilBlockStore;
use crate::io::StorageIO;
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct MetricsSample {
pub elapsed_ms: u64,
pub rss_bytes: Option<u64>,
pub fd_count: Option<u64>,
pub data_dir_bytes: u64,
pub index_dir_bytes: u64,
pub segments_dir_bytes: u64,
pub data_file_count: Option<u64>,
pub segment_count: Option<u64>,
pub block_index_entries: u64,
pub hint_file_bytes: u64,
}
impl MetricsSample {
pub fn metric(&self, name: MetricName) -> Option<u64> {
match name {
MetricName::RssBytes => self.rss_bytes,
MetricName::FdCount => self.fd_count,
MetricName::DataDirBytes => Some(self.data_dir_bytes),
MetricName::IndexDirBytes => Some(self.index_dir_bytes),
MetricName::SegmentsDirBytes => Some(self.segments_dir_bytes),
MetricName::DataFileCount => self.data_file_count,
MetricName::SegmentCount => self.segment_count,
MetricName::BlockIndexEntries => Some(self.block_index_entries),
MetricName::HintFileBytes => Some(self.hint_file_bytes),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum MetricName {
RssBytes,
FdCount,
DataDirBytes,
IndexDirBytes,
SegmentsDirBytes,
DataFileCount,
SegmentCount,
BlockIndexEntries,
HintFileBytes,
}
impl MetricName {
pub const ALL: &'static [MetricName] = &[
Self::RssBytes,
Self::FdCount,
Self::DataDirBytes,
Self::IndexDirBytes,
Self::SegmentsDirBytes,
Self::DataFileCount,
Self::SegmentCount,
Self::BlockIndexEntries,
Self::HintFileBytes,
];
pub const fn as_str(self) -> &'static str {
match self {
Self::RssBytes => "rss_bytes",
Self::FdCount => "fd_count",
Self::DataDirBytes => "data_dir_bytes",
Self::IndexDirBytes => "index_dir_bytes",
Self::SegmentsDirBytes => "segments_dir_bytes",
Self::DataFileCount => "data_file_count",
Self::SegmentCount => "segment_count",
Self::BlockIndexEntries => "block_index_entries",
Self::HintFileBytes => "hint_file_bytes",
}
}
pub const fn min_absolute_delta(self) -> u64 {
match self {
Self::RssBytes => 16 * 1024 * 1024,
Self::FdCount => 16,
Self::DataDirBytes => 16 * 1024 * 1024,
Self::IndexDirBytes => 1024 * 1024,
Self::SegmentsDirBytes => 16 * 1024 * 1024,
Self::DataFileCount => 16,
Self::SegmentCount => 4,
Self::BlockIndexEntries => 1024,
Self::HintFileBytes => 1024 * 1024,
}
}
}
pub fn sample_harness<S: StorageIO + Send + Sync + 'static>(
harness: &Harness<S>,
elapsed: Duration,
) -> MetricsSample {
MetricsSample {
elapsed_ms: u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX),
rss_bytes: read_rss(),
fd_count: count_open_fds(),
data_dir_bytes: dir_bytes(harness.store.data_dir()),
index_dir_bytes: dir_bytes(harness.store.block_index().index_dir()),
segments_dir_bytes: harness
.eventlog
.as_ref()
.map(|el| dir_bytes(&el.segments_dir))
.unwrap_or(0),
data_file_count: data_file_count(&harness.store),
segment_count: harness.eventlog.as_ref().and_then(segment_count),
block_index_entries: harness.store.block_index().approximate_block_count(),
hint_file_bytes: hint_bytes(harness.store.data_dir()),
}
}
fn data_file_count<S: StorageIO + Send + Sync + 'static>(
store: &Arc<TranquilBlockStore<S>>,
) -> Option<u64> {
match store.list_data_files() {
Ok(v) => Some(v.len() as u64),
Err(e) => {
warn!(error = %e, "gauntlet metrics: list_data_files failed");
None
}
}
}
fn segment_count<S: StorageIO + Send + Sync + 'static>(el: &EventLogState<S>) -> Option<u64> {
match el.manager.list_segments() {
Ok(v) => Some(v.len() as u64),
Err(e) => {
warn!(error = %e, "gauntlet metrics: list_segments failed");
None
}
}
}
fn dir_bytes(path: &Path) -> u64 {
let Ok(entries) = std::fs::read_dir(path) else {
return 0;
};
entries
.filter_map(Result::ok)
.map(|entry| match entry.file_type() {
Ok(ft) if ft.is_dir() => dir_bytes(&entry.path()),
Ok(_) => entry.metadata().map(|m| m.len()).unwrap_or(0),
Err(_) => 0,
})
.sum()
}
fn hint_bytes(data_dir: &Path) -> u64 {
let Ok(entries) = std::fs::read_dir(data_dir) else {
return 0;
};
entries
.filter_map(Result::ok)
.filter(|entry| {
entry
.path()
.extension()
.and_then(|e| e.to_str())
.map(|e| e == "tqh")
.unwrap_or(false)
})
.map(|entry| entry.metadata().map(|m| m.len()).unwrap_or(0))
.sum()
}
#[cfg(target_os = "linux")]
fn read_rss() -> Option<u64> {
let status = match std::fs::read_to_string("/proc/self/status") {
Ok(s) => s,
Err(e) => {
warn!(error = %e, "gauntlet metrics: read /proc/self/status failed");
return None;
}
};
let parsed = status.lines().find_map(|line| {
let rest = line.strip_prefix("VmRSS:")?;
let kb: u64 = rest.split_whitespace().next()?.parse().ok()?;
Some(kb * 1024)
});
if parsed.is_none() {
warn!("gauntlet metrics: VmRSS line missing from /proc/self/status");
}
parsed
}
#[cfg(not(target_os = "linux"))]
fn read_rss() -> Option<u64> {
None
}
#[cfg(target_os = "linux")]
fn count_open_fds() -> Option<u64> {
match std::fs::read_dir("/proc/self/fd") {
Ok(entries) => Some(entries.filter_map(Result::ok).count() as u64),
Err(e) => {
warn!(error = %e, "gauntlet metrics: read /proc/self/fd failed");
None
}
}
}
#[cfg(not(target_os = "linux"))]
fn count_open_fds() -> Option<u64> {
None
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn metric_names_roundtrip_strings() {
MetricName::ALL.iter().for_each(|m| {
let s = m.as_str();
assert!(!s.is_empty());
});
}
#[test]
#[cfg_attr(not(target_os = "linux"), ignore = "linux /proc only")]
fn rss_reads_nonzero() {
let rss = read_rss().expect("rss");
assert!(rss > 0, "rss should be positive, got {rss}");
}
#[test]
#[cfg_attr(not(target_os = "linux"), ignore = "linux /proc only")]
fn fd_count_reads_nonzero() {
let fd = count_open_fds().expect("fd");
assert!(fd > 0);
}
#[test]
fn dir_bytes_sums_entries() {
let dir = tempfile::TempDir::new().unwrap();
std::fs::write(dir.path().join("a"), b"1234").unwrap();
std::fs::write(dir.path().join("b"), b"5678").unwrap();
assert_eq!(dir_bytes(dir.path()), 8);
}
}