diff --git a/.config/nextest.toml b/.config/nextest.toml index 923cc0d..8d67243 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -45,6 +45,7 @@ slow-timeout = { period = "600s", terminate-after = 1 } retries = 0 fail-fast = false test-threads = 1 +slow-timeout = { period = "5m", terminate-after = 1000 } [test-groups] serial-env-tests = { max-threads = 1 } diff --git a/crates/tranquil-api/src/admin/signal.rs b/crates/tranquil-api/src/admin/signal.rs index 8a56df0..cc956b5 100644 --- a/crates/tranquil-api/src/admin/signal.rs +++ b/crates/tranquil-api/src/admin/signal.rs @@ -76,7 +76,7 @@ pub async fn link_signal_device( let result = tokio::select! { biased; _ = shutdown.cancelled() => { - tracing::info!("Signal linking aborted due to server shutdown"); + tracing::info!("server shutting down, aborting signal linking"); return; } r = link_result.completion => r, @@ -84,10 +84,10 @@ pub async fn link_signal_device( match result { Ok(Ok(client)) => { if slot_for_task.complete_link(generation, client).await { - tracing::info!("Signal device linked successfully"); + tracing::info!("signal device linked"); } else { tracing::warn!( - "Signal link completed but generation mismatch or already linked; discarding" + "discarding completed signal link, generation mismatch or already linked" ); } } diff --git a/crates/tranquil-api/src/notification_prefs.rs b/crates/tranquil-api/src/notification_prefs.rs index f690e72..323bee7 100644 --- a/crates/tranquil-api/src/notification_prefs.rs +++ b/crates/tranquil-api/src/notification_prefs.rs @@ -282,7 +282,7 @@ async fn process_messaging_channel_update( "Invalid Telegram username. Must be 5-32 characters, alphanumeric or underscore".into(), ), CommsChannel::Signal => ApiError::InvalidRequest( - "Invalid Signal username. Must be 3-32 characters followed by .XX (e.g. username.01)".into(), + "Invalid Signal username. Must be a 3-32 character nickname, a dot, then a 2-20 digit discriminator".into(), ), CommsChannel::Email => ApiError::InvalidEmail, }); diff --git a/crates/tranquil-comms/src/sender.rs b/crates/tranquil-comms/src/sender.rs index 5d9eec1..cf5a9bc 100644 --- a/crates/tranquil-comms/src/sender.rs +++ b/crates/tranquil-comms/src/sender.rs @@ -299,7 +299,7 @@ impl DiscordSender { "type": 1, "options": [{ "name": "handle", - "description": "Your PDS handle (e.g. alice.example.com)", + "description": "Your PDS handle", "type": 3, "required": false }] diff --git a/crates/tranquil-config/src/lib.rs b/crates/tranquil-config/src/lib.rs index 3849d79..8583872 100644 --- a/crates/tranquil-config/src/lib.rs +++ b/crates/tranquil-config/src/lib.rs @@ -394,7 +394,7 @@ impl TranquilConfig { #[derive(Debug, Config)] pub struct ServerConfig { - /// Public hostname of the PDS (e.g. `pds.example.com`). + /// Public hostname of the PDS, such as `pds.example.com`. #[config(env = "PDS_HOSTNAME")] pub hostname: String, @@ -463,8 +463,7 @@ impl ServerConfig { format!("https://{}", self.hostname) } - /// Hostname without port suffix (e.g. `pds.example.com` from - /// `pds.example.com:443`). + /// Hostname without port suffix. Returns `pds.example.com` from `pds.example.com:443`. pub fn hostname_without_port(&self) -> &str { self.hostname.split(':').next().unwrap_or(&self.hostname) } diff --git a/crates/tranquil-pds/tests/security_fixes.rs b/crates/tranquil-pds/tests/security_fixes.rs index a302ae2..6db6874 100644 --- a/crates/tranquil-pds/tests/security_fixes.rs +++ b/crates/tranquil-pds/tests/security_fixes.rs @@ -88,12 +88,17 @@ fn test_signal_username_validation() { assert!(is_valid_signal_username("bob_smith.99")); assert!(is_valid_signal_username("user123.42")); assert!(is_valid_signal_username("lu1.01")); - assert!(is_valid_signal_username("abc.00")); assert!(is_valid_signal_username("a_very_long_username_here.55")); + assert!(is_valid_signal_username("alice.123")); + assert!(is_valid_signal_username("alice.999999999")); + assert!(is_valid_signal_username("alice.18446744073709551615")); assert!(!is_valid_signal_username("alice")); assert!(!is_valid_signal_username("alice.1")); assert!(!is_valid_signal_username("alice.001")); + assert!(!is_valid_signal_username("abc.00")); + assert!(!is_valid_signal_username("alice.0")); + assert!(!is_valid_signal_username("alice.999999999999999999999")); assert!(!is_valid_signal_username(".01")); assert!(!is_valid_signal_username("ab.01")); assert!(!is_valid_signal_username("")); diff --git a/crates/tranquil-signal/src/client.rs b/crates/tranquil-signal/src/client.rs index 244df74..d3c5dbf 100644 --- a/crates/tranquil-signal/src/client.rs +++ b/crates/tranquil-signal/src/client.rs @@ -33,15 +33,11 @@ impl SignalUsername { pub fn parse(username: &str) -> Result { let reject = || Err(InvalidSignalUsername(username.to_string())); - if username.len() < 6 || username.len() > 35 { - return reject(); - } - let Some((base, discriminator)) = username.rsplit_once('.') else { return reject(); }; - if base.len() < 3 || base.len() > 32 { + if !matches!(base.len(), 3..=32) { return reject(); } @@ -53,7 +49,7 @@ impl SignalUsername { return reject(); } - if discriminator.len() != 2 || !discriminator.chars().all(|c| c.is_ascii_digit()) { + if !is_valid_discriminator(discriminator) { return reject(); } @@ -65,6 +61,19 @@ impl SignalUsername { } } +fn is_valid_discriminator(s: &str) -> bool { + if !s.chars().all(|c| c.is_ascii_digit()) { + return false; + } + if !matches!(s.len(), 2..=20) { + return false; + } + if s.len() > 2 && s.starts_with('0') { + return false; + } + s.parse::().is_ok_and(|n| n != 0) +} + impl fmt::Display for SignalUsername { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(&self.0) @@ -115,7 +124,7 @@ impl fmt::Display for MessageTooLong { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "message body too long: {} bytes (max {})", + "message body is {} bytes, max {}", self.len, self.max ) } @@ -368,13 +377,13 @@ impl SignalClient { let req = tokio::select! { biased; _ = shutdown.cancelled() => { - tracing::info!("signal worker shutting down (cancellation)"); + tracing::info!("signal worker cancelled, shutting down"); break; } msg = rx.recv() => match msg { Some(r) => r, None => { - tracing::info!("signal worker shutting down (channel closed)"); + tracing::info!("signal worker channel closed, shutting down"); break; } }, diff --git a/crates/tranquil-signal/src/fjall_store.rs b/crates/tranquil-signal/src/fjall_store.rs index 47b3884..bb5c6b8 100644 --- a/crates/tranquil-signal/src/fjall_store.rs +++ b/crates/tranquil-signal/src/fjall_store.rs @@ -1050,7 +1050,7 @@ impl ContentsStore for FjallSignalStore { .and_then(|v| match <[u8; 32]>::try_from(v.as_ref()) { Ok(arr) => Some(ProfileKey { bytes: arr }), Err(_) => { - warn!(%uuid, len = v.len(), "corrupted profile key (expected 32 bytes)"); + warn!(%uuid, len = v.len(), "corrupted profile key, expected 32 bytes"); None } })) diff --git a/crates/tranquil-signal/src/store.rs b/crates/tranquil-signal/src/store.rs index f36127a..8efe112 100644 --- a/crates/tranquil-signal/src/store.rs +++ b/crates/tranquil-signal/src/store.rs @@ -1117,7 +1117,7 @@ impl ContentsStore for PgSignalStore { Some(r) => match <[u8; 32]>::try_from(r.key.as_slice()) { Ok(arr) => Some(ProfileKey { bytes: arr }), Err(_) => { - warn!(%uuid, len = r.key.len(), "corrupted profile key (expected 32 bytes)"); + warn!(%uuid, len = r.key.len(), "corrupted profile key, expected 32 bytes"); None } }, diff --git a/crates/tranquil-store/src/gauntlet/leak.rs b/crates/tranquil-store/src/gauntlet/leak.rs index 981131b..6977052 100644 --- a/crates/tranquil-store/src/gauntlet/leak.rs +++ b/crates/tranquil-store/src/gauntlet/leak.rs @@ -166,9 +166,8 @@ mod tests { #[test] fn flat_metrics_no_violation() { let cfg = LeakGateConfig::short_for_tests(); - let series: Vec = (0..20) - .map(|i| sample(60_000 + i * 60_000, GIB)) - .collect(); + let series: Vec = + (0..20).map(|i| sample(60_000 + i * 60_000, GIB)).collect(); assert!(evaluate(&series, cfg).is_empty()); } @@ -221,9 +220,8 @@ mod tests { #[test] fn missing_metric_samples_skipped() { let cfg = LeakGateConfig::short_for_tests(); - let mut series: Vec = (0..10) - .map(|i| sample(60_000 + i * 60_000, GIB)) - .collect(); + let mut series: Vec = + (0..10).map(|i| sample(60_000 + i * 60_000, GIB)).collect(); series[3].rss_bytes = None; series[7].rss_bytes = None; assert!(evaluate(&series, cfg).is_empty()); diff --git a/crates/tranquil-store/src/gauntlet/mod.rs b/crates/tranquil-store/src/gauntlet/mod.rs index 8920601..3dffeb6 100644 --- a/crates/tranquil-store/src/gauntlet/mod.rs +++ b/crates/tranquil-store/src/gauntlet/mod.rs @@ -1,5 +1,7 @@ pub mod farm; pub mod invariants; +pub mod leak; +pub mod metrics; pub mod op; pub mod oracle; pub mod overrides; @@ -7,11 +9,14 @@ pub mod regression; pub mod runner; pub mod scenarios; pub mod shrink; +pub mod soak; pub mod workload; pub use invariants::{ EventLogSnapshot, Invariant, InvariantSet, InvariantViolation, SnapshotEvent, invariants_for, }; +pub use leak::{LeakGateBuildError, LeakGateConfig, LeakViolation, evaluate as evaluate_leak_gate}; +pub use metrics::{MetricName, MetricsSample, sample_harness}; pub use op::{ CollectionName, DidSeed, EventKind, Op, OpStream, PayloadSeed, RecordKey, RetentionSecs, Seed, ValueSeed, @@ -26,6 +31,10 @@ pub use runner::{ }; pub use scenarios::{Scenario, UnknownScenario, config_for}; pub use shrink::{ShrinkOutcome, shrink_failure}; +pub use soak::{ + DEFAULT_CHUNK_OPS, DEFAULT_SAMPLE_INTERVAL_MS, InvariantViolationRecord, SoakConfig, SoakError, + SoakEvent, SoakReport, run_soak, +}; pub use workload::{ ByteRange, DidSpaceSize, KeySpaceSize, OpCount, OpWeights, RetentionMaxSecs, SizeDistribution, ValueBytes, WorkloadModel, diff --git a/crates/tranquil-store/src/gauntlet/regression.rs b/crates/tranquil-store/src/gauntlet/regression.rs index ebf73b9..1a6f69f 100644 --- a/crates/tranquil-store/src/gauntlet/regression.rs +++ b/crates/tranquil-store/src/gauntlet/regression.rs @@ -97,10 +97,10 @@ impl RegressionRecord { f.sync_all()?; } std::fs::rename(&tmp, &path)?; - if let Some(parent) = path.parent() { - if let Ok(dir) = std::fs::File::open(parent) { - let _ = dir.sync_all(); - } + if let Some(parent) = path.parent() + && let Ok(dir) = std::fs::File::open(parent) + { + let _ = dir.sync_all(); } Ok(path) } diff --git a/crates/tranquil-store/src/gauntlet/runner.rs b/crates/tranquil-store/src/gauntlet/runner.rs index 7ee61e2..c14bf9a 100644 --- a/crates/tranquil-store/src/gauntlet/runner.rs +++ b/crates/tranquil-store/src/gauntlet/runner.rs @@ -126,7 +126,7 @@ impl GauntletReport { } #[derive(Debug, thiserror::Error)] -enum OpError { +pub(super) enum OpError { #[error("mst add: {0}")] MstAdd(String), #[error("mst delete: {0}")] @@ -249,7 +249,7 @@ impl Gauntlet { } } -fn segments_subdir(root: &Path) -> PathBuf { +pub(super) fn segments_subdir(root: &Path) -> PathBuf { root.join("segments") } @@ -373,7 +373,7 @@ async fn run_inner_simulated( } } -fn open_eventlog( +pub(super) fn open_eventlog( io: S, segments_dir: PathBuf, max_segment_size: u64, @@ -586,7 +586,7 @@ where } } -fn eventlog_snapshot( +pub(super) fn eventlog_snapshot( state: Option<&EventLogState>, ) -> Option { let s = state?; @@ -746,7 +746,7 @@ async fn run_quick_check( } } -async fn run_invariants( +pub(super) async fn run_invariants( store: &Arc>, oracle: &Oracle, root: Option, @@ -828,7 +828,7 @@ fn diff_snapshots(pre: &[(CidBytes, u32)], post: &[(CidBytes, u32)]) -> Option( +pub(super) async fn refresh_oracle_graph( store: &Arc>, oracle: &mut Oracle, root: Option, @@ -889,7 +889,7 @@ fn should_restart(policy: RestartPolicy, idx: OpIndex, rng: &mut Lcg) -> Restart } } -fn blockstore_config(dir: &std::path::Path, s: &StoreConfig) -> BlockStoreConfig { +pub(super) fn blockstore_config(dir: &std::path::Path, s: &StoreConfig) -> BlockStoreConfig { BlockStoreConfig { data_dir: dir.join("data"), index_dir: dir.join("index"), @@ -954,7 +954,7 @@ fn did_hash_for_seed(seed: DidSeed) -> DidHash { DidHash::from_did(&format!("did:plc:gauntlet{:08x}", seed.0)) } -async fn apply_op( +pub(super) async fn apply_op( harness: &mut Harness, root: &mut Option, oracle: &mut Oracle, @@ -1202,7 +1202,7 @@ fn diff_obsolete( ) -> Result, OpError> { removed_mst_blocks .into_iter() - .chain(removed_cids.into_iter()) + .chain(removed_cids) .map(|c| try_cid_to_fixed(&c)) .collect::>() .map_err(OpError::from) diff --git a/crates/tranquil-store/src/gauntlet/soak.rs b/crates/tranquil-store/src/gauntlet/soak.rs new file mode 100644 index 0000000..d91ae23 --- /dev/null +++ b/crates/tranquil-store/src/gauntlet/soak.rs @@ -0,0 +1,326 @@ +use std::io::{self, Write}; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use cid::Cid; +use serde::{Deserialize, Serialize}; +use tracing::warn; + +use super::invariants::{InvariantSet, InvariantViolation}; +use super::leak::{LeakGateConfig, LeakViolation, evaluate as evaluate_leak_gate}; +use super::metrics::{MetricsSample, sample_harness}; +use super::op::{OpStream, Seed}; +use super::oracle::Oracle; +use super::runner::{ + EventLogState, GauntletConfig, Harness, IoBackend, apply_op, blockstore_config, + eventlog_snapshot, open_eventlog, refresh_oracle_graph, run_invariants, segments_subdir, +}; +use super::workload::OpCount; +use crate::blockstore::TranquilBlockStore; +use crate::io::{RealIO, StorageIO}; + +const OP_ERROR_LOG_THROTTLE: u64 = 1024; + +pub const DEFAULT_CHUNK_OPS: usize = 5_000; +pub const DEFAULT_SAMPLE_INTERVAL_MS: u64 = 60_000; + +#[derive(Debug, Clone)] +pub struct SoakConfig { + pub gauntlet: GauntletConfig, + pub total_duration: Duration, + pub sample_interval: Duration, + pub chunk_ops: usize, + pub leak_gate: LeakGateConfig, +} + +impl SoakConfig { + pub fn new(gauntlet: GauntletConfig, total_duration: Duration) -> Self { + Self { + gauntlet, + total_duration, + sample_interval: Duration::from_millis(DEFAULT_SAMPLE_INTERVAL_MS), + chunk_ops: DEFAULT_CHUNK_OPS, + leak_gate: LeakGateConfig::standard(), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum SoakError { + #[error(transparent)] + Io(#[from] io::Error), + #[error("soak requires IoBackend::Real; scenario configured Simulated")] + SimulatedBackendRejected, + #[error("open block store: {0}")] + StoreOpen(String), + #[error("open event log: {0}")] + EventLogOpen(String), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SoakReport { + pub seed: Seed, + pub ops_executed: u64, + pub op_errors: u64, + pub chunks: u64, + pub samples: Vec, + pub invariant_violations: Vec, + pub leak_violations: Vec, + pub total_wall_ms: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct InvariantViolationRecord { + pub invariant: String, + pub detail: String, +} + +impl SoakReport { + pub fn is_clean(&self) -> bool { + self.invariant_violations.is_empty() && self.leak_violations.is_empty() + } +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum SoakEvent { + #[serde(rename = "sample")] + Sample { + seed: u64, + chunk: u64, + ops_executed: u64, + sample: MetricsSample, + }, + #[serde(rename = "invariant_violation")] + Invariant { + seed: u64, + invariant: String, + detail: String, + }, + #[serde(rename = "summary")] + Summary { + seed: u64, + total_wall_ms: u64, + ops_executed: u64, + op_errors: u64, + chunks: u64, + clean: bool, + invariant_violations: usize, + leak_violations: Vec, + }, +} + +pub async fn run_soak( + cfg: SoakConfig, + mut emitter: W, +) -> Result { + if !matches!(cfg.gauntlet.io, IoBackend::Real) { + return Err(SoakError::SimulatedBackendRejected); + } + let dir = tempfile::TempDir::new()?; + let store_cfg = blockstore_config(dir.path(), &cfg.gauntlet.store); + let segments_dir: PathBuf = segments_subdir(dir.path()); + let store = TranquilBlockStore::open(store_cfg) + .map(Arc::new) + .map_err(|e| SoakError::StoreOpen(e.to_string()))?; + let eventlog: Option> = match cfg.gauntlet.eventlog { + None => None, + Some(ec) => Some( + open_eventlog(RealIO::new(), segments_dir, ec.max_segment_size.0) + .map_err(|e| SoakError::EventLogOpen(e.to_string()))?, + ), + }; + let mut harness = Harness { store, eventlog }; + let outcome = drive_soak(&mut harness, &cfg, &mut emitter).await; + shutdown_harness(&mut harness); + outcome +} + +fn shutdown_harness(harness: &mut Harness) { + if let Some(el) = harness.eventlog.as_mut() { + if let Err(e) = el.writer.shutdown() { + warn!(error = %e, "soak: event log writer shutdown failed"); + } + el.manager.shutdown(); + } +} + +async fn drive_soak( + harness: &mut Harness, + cfg: &SoakConfig, + emitter: &mut W, +) -> Result +where + S: StorageIO + Send + Sync + 'static, + W: Write + Send, +{ + let mut oracle = Oracle::new(); + let mut root: Option = None; + + let start = Instant::now(); + let mut samples: Vec = Vec::new(); + let mut invariant_records: Vec = Vec::new(); + let mut ops_executed: u64 = 0; + let mut op_errors: u64 = 0; + let mut chunks: u64 = 0; + let mut last_sample = start; + let mut next_error_log_at: u64 = 1; + + let initial = sample_harness(harness, Duration::ZERO); + emit_event( + emitter, + &SoakEvent::Sample { + seed: cfg.gauntlet.seed.0, + chunk: 0, + ops_executed: 0, + sample: initial, + }, + )?; + samples.push(initial); + + while start.elapsed() < cfg.total_duration { + let chunk_seed = Seed( + cfg.gauntlet + .seed + .0 + .wrapping_add(chunks.wrapping_mul(0x9E37_79B9_7F4A_7C15)), + ); + let stream: OpStream = cfg + .gauntlet + .workload + .generate(chunk_seed, OpCount(cfg.chunk_ops)); + for op in stream.iter() { + if start.elapsed() >= cfg.total_duration { + break; + } + match apply_op(harness, &mut root, &mut oracle, op, &cfg.gauntlet.workload).await { + Ok(()) => { + ops_executed = ops_executed.saturating_add(1); + } + Err(e) => { + op_errors = op_errors.saturating_add(1); + if op_errors >= next_error_log_at { + warn!( + op_errors, + ops_executed, + elapsed_ms = u64::try_from(start.elapsed().as_millis()) + .unwrap_or(u64::MAX), + error = %e, + "soak: op error milestone" + ); + next_error_log_at = next_error_log_at + .saturating_mul(2) + .max(OP_ERROR_LOG_THROTTLE); + } + } + } + if last_sample.elapsed() >= cfg.sample_interval { + let elapsed = start.elapsed(); + let s = sample_harness(harness, elapsed); + emit_event( + emitter, + &SoakEvent::Sample { + seed: cfg.gauntlet.seed.0, + chunk: chunks, + ops_executed, + sample: s, + }, + )?; + samples.push(s); + last_sample = Instant::now(); + } + } + chunks = chunks.saturating_add(1); + tokio::task::yield_now().await; + } + + let final_elapsed = start.elapsed(); + let final_sample = sample_harness(harness, final_elapsed); + emit_event( + emitter, + &SoakEvent::Sample { + seed: cfg.gauntlet.seed.0, + chunk: chunks, + ops_executed, + sample: final_sample, + }, + )?; + samples.push(final_sample); + + let invariants = match refresh_oracle_graph(&harness.store, &mut oracle, root).await { + Ok(()) => { + let snapshot = eventlog_snapshot(harness.eventlog.as_ref()); + let set = cfg + .gauntlet + .invariants + .without(InvariantSet::RESTART_IDEMPOTENT); + run_invariants(&harness.store, &oracle, root, snapshot, set).await + } + Err(e) => vec![InvariantViolation { + invariant: "MstRootDurability", + detail: format!("refresh: {e}"), + }], + }; + for v in invariants.iter() { + let rec = InvariantViolationRecord { + invariant: v.invariant.to_string(), + detail: v.detail.clone(), + }; + emit_event( + emitter, + &SoakEvent::Invariant { + seed: cfg.gauntlet.seed.0, + invariant: rec.invariant.clone(), + detail: rec.detail.clone(), + }, + )?; + invariant_records.push(rec); + } + + let leak_violations = evaluate_leak_gate(&samples, cfg.leak_gate); + let total_wall_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX); + let clean = invariant_records.is_empty() && leak_violations.is_empty(); + emit_event( + emitter, + &SoakEvent::Summary { + seed: cfg.gauntlet.seed.0, + total_wall_ms, + ops_executed, + op_errors, + chunks, + clean, + invariant_violations: invariant_records.len(), + leak_violations: leak_violations.clone(), + }, + )?; + + Ok(SoakReport { + seed: cfg.gauntlet.seed, + ops_executed, + op_errors, + chunks, + samples, + invariant_violations: invariant_records, + leak_violations, + total_wall_ms, + }) +} + +fn emit_event(emitter: &mut W, event: &SoakEvent) -> io::Result<()> { + let line = serde_json::to_string(event).map_err(io::Error::other)?; + writeln!(emitter, "{line}")?; + emitter.flush() +} + +#[cfg(test)] +mod tests { + use super::*; + + fn send_sync() {} + + #[test] + fn soak_error_is_send_sync() { + send_sync::(); + } +} diff --git a/crates/tranquil-store/tests/gauntlet_soak.rs b/crates/tranquil-store/tests/gauntlet_soak.rs new file mode 100644 index 0000000..f991630 --- /dev/null +++ b/crates/tranquil-store/tests/gauntlet_soak.rs @@ -0,0 +1,119 @@ +use std::io::{self, BufWriter, Write}; +use std::time::Duration; + +use tranquil_store::gauntlet::{ + LeakGateConfig, Scenario, Seed, SoakConfig, SoakReport, config_for, run_soak, +}; + +fn soak_hours() -> Option { + std::env::var("GAUNTLET_SOAK_HOURS") + .ok() + .and_then(|s| s.parse::().ok()) + .filter(|h| h.is_finite() && *h > 0.0) +} + +fn soak_sample_interval() -> Duration { + std::env::var("GAUNTLET_SOAK_SAMPLE_SECS") + .ok() + .and_then(|s| s.parse::().ok()) + .filter(|s| *s > 0) + .map(Duration::from_secs) + .unwrap_or_else(|| Duration::from_secs(60)) +} + +fn emitter_stream() -> Box { + match std::env::var("GAUNTLET_SOAK_OUTPUT").ok().as_deref() { + Some(path) if !path.is_empty() => { + let f = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(path) + .expect("open GAUNTLET_SOAK_OUTPUT target"); + Box::new(BufWriter::new(f)) + } + _ => Box::new(BufWriter::new(io::stderr())), + } +} + +fn report_summary(report: &SoakReport) -> String { + let leaks: Vec = report + .leak_violations + .iter() + .map(|v| { + format!( + "{}: {} -> {} ({}% over {}ms window, limit {}%)", + v.metric, + v.start_value, + v.end_value, + v.growth_pct.round() as i64, + v.end_ms - v.start_ms, + v.limit_pct + ) + }) + .collect(); + let invariants: Vec = report + .invariant_violations + .iter() + .map(|v| format!("{}: {}", v.invariant, v.detail)) + .collect(); + format!( + "seed={:016x} ops={} chunks={} errors={} wall_ms={} leaks=[{}] invariants=[{}]", + report.seed.0, + report.ops_executed, + report.chunks, + report.op_errors, + report.total_wall_ms, + leaks.join(" ; "), + invariants.join(" ; "), + ) +} + +#[tokio::test] +async fn soak_short_smoke() { + let cfg = SoakConfig { + gauntlet: config_for(Scenario::SmokePR, Seed(7)), + total_duration: Duration::from_secs(10), + sample_interval: Duration::from_secs(2), + chunk_ops: 200, + leak_gate: LeakGateConfig::try_new(0, 60_000, 1000.0).expect("valid leak gate"), + }; + let mut buf: Vec = Vec::new(); + let report = run_soak(cfg, &mut buf).await.expect("soak run"); + assert!( + report.samples.len() >= 3, + "expected at least initial + periodic + final samples, got {}", + report.samples.len() + ); + assert!( + report.ops_executed > 0, + "expected ops executed, got {}", + report.ops_executed + ); + let text = String::from_utf8(buf).expect("utf8 ndjson"); + assert!( + text.contains("\"type\":\"summary\""), + "ndjson must include summary line; got {text}" + ); +} + +#[tokio::test] +#[ignore = "configurable via GAUNTLET_SOAK_HOURS; default 24h leak gate (1h warmup, 4h window, 5% limit)"] +async fn soak_long_leak_gate() { + let hours = soak_hours().unwrap_or(24.0); + let total = Duration::from_secs_f64(hours * 3600.0); + let cfg = SoakConfig { + gauntlet: config_for(Scenario::MstChurn, Seed(0)), + total_duration: total, + sample_interval: soak_sample_interval(), + chunk_ops: 10_000, + leak_gate: LeakGateConfig::standard(), + }; + let mut emitter = emitter_stream(); + let report = run_soak(cfg, &mut emitter).await.expect("soak run"); + let _ = emitter.flush(); + assert!( + report.is_clean(), + "soak failed: {}", + report_summary(&report) + ); +} diff --git a/crates/tranquil-store/tests/rotation_robustness.rs b/crates/tranquil-store/tests/rotation_robustness.rs index 97eae32..ed97fa7 100644 --- a/crates/tranquil-store/tests/rotation_robustness.rs +++ b/crates/tranquil-store/tests/rotation_robustness.rs @@ -203,7 +203,7 @@ fn concurrent_reader_survives_evict_handle() { let write_handle = manager.open_for_append(file_id).unwrap(); { let mut writer = DataFileWriter::new(&*sim, write_handle.fd(), file_id).unwrap(); - let _ = writer.append_block(&test_cid(1), &vec![0x11; 128]).unwrap(); + let _ = writer.append_block(&test_cid(1), &[0x11; 128]).unwrap(); writer.sync().unwrap(); } drop(write_handle); diff --git a/crates/tranquil-store/tests/sim_blockstore.rs b/crates/tranquil-store/tests/sim_blockstore.rs index 70ed787..d6c6ef0 100644 --- a/crates/tranquil-store/tests/sim_blockstore.rs +++ b/crates/tranquil-store/tests/sim_blockstore.rs @@ -517,8 +517,8 @@ fn sim_aggressive_faults_data_integrity() { return; }; let start_pos = writer.position(); - drop(writer); - drop(handle); + let _ = writer; + let _ = handle; let mut rng = Rng::new(seed); let block_count = (rng.range_u32(15) + 5) as u16; diff --git a/crates/tranquil-store/tests/verify_rollback_orphan.rs b/crates/tranquil-store/tests/verify_rollback_orphan.rs index ae0bdf6..7ef036c 100644 --- a/crates/tranquil-store/tests/verify_rollback_orphan.rs +++ b/crates/tranquil-store/tests/verify_rollback_orphan.rs @@ -64,8 +64,8 @@ fn rollback_rotation_does_not_leave_orphan_data_file() { io.sync_dir(&data_dir).unwrap(); let _ = io.delete(&hint_file_path(&data_dir, next_id)); - drop(writer); - drop(next_handle); + let _ = writer; + let _ = next_handle; manager.rollback_rotation(next_id); } diff --git a/example.toml b/example.toml index 2a5d7b1..5353aba 100644 --- a/example.toml +++ b/example.toml @@ -1,5 +1,5 @@ [server] -# Public hostname of the PDS (e.g. `pds.example.com`). +# Public hostname of the PDS, such as `pds.example.com`. # # Can also be specified via environment variable `PDS_HOSTNAME`. #