mirror of
https://tangled.org/tranquil.farm/tranquil-pds
synced 2026-04-24 10:20:29 +00:00
feat(tranquil-store): soak harness driving leak gate, signal tweaks
Lewis: May this revision serve well! <lu5a@proton.me>
This commit is contained in:
@@ -45,6 +45,7 @@ slow-timeout = { period = "600s", terminate-after = 1 }
|
||||
retries = 0
|
||||
fail-fast = false
|
||||
test-threads = 1
|
||||
slow-timeout = { period = "5m", terminate-after = 1000 }
|
||||
|
||||
[test-groups]
|
||||
serial-env-tests = { max-threads = 1 }
|
||||
|
||||
@@ -76,7 +76,7 @@ pub async fn link_signal_device(
|
||||
let result = tokio::select! {
|
||||
biased;
|
||||
_ = shutdown.cancelled() => {
|
||||
tracing::info!("Signal linking aborted due to server shutdown");
|
||||
tracing::info!("server shutting down, aborting signal linking");
|
||||
return;
|
||||
}
|
||||
r = link_result.completion => r,
|
||||
@@ -84,10 +84,10 @@ pub async fn link_signal_device(
|
||||
match result {
|
||||
Ok(Ok(client)) => {
|
||||
if slot_for_task.complete_link(generation, client).await {
|
||||
tracing::info!("Signal device linked successfully");
|
||||
tracing::info!("signal device linked");
|
||||
} else {
|
||||
tracing::warn!(
|
||||
"Signal link completed but generation mismatch or already linked; discarding"
|
||||
"discarding completed signal link, generation mismatch or already linked"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -282,7 +282,7 @@ async fn process_messaging_channel_update(
|
||||
"Invalid Telegram username. Must be 5-32 characters, alphanumeric or underscore".into(),
|
||||
),
|
||||
CommsChannel::Signal => ApiError::InvalidRequest(
|
||||
"Invalid Signal username. Must be 3-32 characters followed by .XX (e.g. username.01)".into(),
|
||||
"Invalid Signal username. Must be a 3-32 character nickname, a dot, then a 2-20 digit discriminator".into(),
|
||||
),
|
||||
CommsChannel::Email => ApiError::InvalidEmail,
|
||||
});
|
||||
|
||||
@@ -299,7 +299,7 @@ impl DiscordSender {
|
||||
"type": 1,
|
||||
"options": [{
|
||||
"name": "handle",
|
||||
"description": "Your PDS handle (e.g. alice.example.com)",
|
||||
"description": "Your PDS handle",
|
||||
"type": 3,
|
||||
"required": false
|
||||
}]
|
||||
|
||||
@@ -394,7 +394,7 @@ impl TranquilConfig {
|
||||
|
||||
#[derive(Debug, Config)]
|
||||
pub struct ServerConfig {
|
||||
/// Public hostname of the PDS (e.g. `pds.example.com`).
|
||||
/// Public hostname of the PDS, such as `pds.example.com`.
|
||||
#[config(env = "PDS_HOSTNAME")]
|
||||
pub hostname: String,
|
||||
|
||||
@@ -463,8 +463,7 @@ impl ServerConfig {
|
||||
format!("https://{}", self.hostname)
|
||||
}
|
||||
|
||||
/// Hostname without port suffix (e.g. `pds.example.com` from
|
||||
/// `pds.example.com:443`).
|
||||
/// Hostname without port suffix. Returns `pds.example.com` from `pds.example.com:443`.
|
||||
pub fn hostname_without_port(&self) -> &str {
|
||||
self.hostname.split(':').next().unwrap_or(&self.hostname)
|
||||
}
|
||||
|
||||
@@ -88,12 +88,17 @@ fn test_signal_username_validation() {
|
||||
assert!(is_valid_signal_username("bob_smith.99"));
|
||||
assert!(is_valid_signal_username("user123.42"));
|
||||
assert!(is_valid_signal_username("lu1.01"));
|
||||
assert!(is_valid_signal_username("abc.00"));
|
||||
assert!(is_valid_signal_username("a_very_long_username_here.55"));
|
||||
assert!(is_valid_signal_username("alice.123"));
|
||||
assert!(is_valid_signal_username("alice.999999999"));
|
||||
assert!(is_valid_signal_username("alice.18446744073709551615"));
|
||||
|
||||
assert!(!is_valid_signal_username("alice"));
|
||||
assert!(!is_valid_signal_username("alice.1"));
|
||||
assert!(!is_valid_signal_username("alice.001"));
|
||||
assert!(!is_valid_signal_username("abc.00"));
|
||||
assert!(!is_valid_signal_username("alice.0"));
|
||||
assert!(!is_valid_signal_username("alice.999999999999999999999"));
|
||||
assert!(!is_valid_signal_username(".01"));
|
||||
assert!(!is_valid_signal_username("ab.01"));
|
||||
assert!(!is_valid_signal_username(""));
|
||||
|
||||
@@ -33,15 +33,11 @@ impl SignalUsername {
|
||||
pub fn parse(username: &str) -> Result<Self, InvalidSignalUsername> {
|
||||
let reject = || Err(InvalidSignalUsername(username.to_string()));
|
||||
|
||||
if username.len() < 6 || username.len() > 35 {
|
||||
return reject();
|
||||
}
|
||||
|
||||
let Some((base, discriminator)) = username.rsplit_once('.') else {
|
||||
return reject();
|
||||
};
|
||||
|
||||
if base.len() < 3 || base.len() > 32 {
|
||||
if !matches!(base.len(), 3..=32) {
|
||||
return reject();
|
||||
}
|
||||
|
||||
@@ -53,7 +49,7 @@ impl SignalUsername {
|
||||
return reject();
|
||||
}
|
||||
|
||||
if discriminator.len() != 2 || !discriminator.chars().all(|c| c.is_ascii_digit()) {
|
||||
if !is_valid_discriminator(discriminator) {
|
||||
return reject();
|
||||
}
|
||||
|
||||
@@ -65,6 +61,19 @@ impl SignalUsername {
|
||||
}
|
||||
}
|
||||
|
||||
fn is_valid_discriminator(s: &str) -> bool {
|
||||
if !s.chars().all(|c| c.is_ascii_digit()) {
|
||||
return false;
|
||||
}
|
||||
if !matches!(s.len(), 2..=20) {
|
||||
return false;
|
||||
}
|
||||
if s.len() > 2 && s.starts_with('0') {
|
||||
return false;
|
||||
}
|
||||
s.parse::<u64>().is_ok_and(|n| n != 0)
|
||||
}
|
||||
|
||||
impl fmt::Display for SignalUsername {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.write_str(&self.0)
|
||||
@@ -115,7 +124,7 @@ impl fmt::Display for MessageTooLong {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"message body too long: {} bytes (max {})",
|
||||
"message body is {} bytes, max {}",
|
||||
self.len, self.max
|
||||
)
|
||||
}
|
||||
@@ -368,13 +377,13 @@ impl SignalClient {
|
||||
let req = tokio::select! {
|
||||
biased;
|
||||
_ = shutdown.cancelled() => {
|
||||
tracing::info!("signal worker shutting down (cancellation)");
|
||||
tracing::info!("signal worker cancelled, shutting down");
|
||||
break;
|
||||
}
|
||||
msg = rx.recv() => match msg {
|
||||
Some(r) => r,
|
||||
None => {
|
||||
tracing::info!("signal worker shutting down (channel closed)");
|
||||
tracing::info!("signal worker channel closed, shutting down");
|
||||
break;
|
||||
}
|
||||
},
|
||||
|
||||
@@ -1050,7 +1050,7 @@ impl ContentsStore for FjallSignalStore {
|
||||
.and_then(|v| match <[u8; 32]>::try_from(v.as_ref()) {
|
||||
Ok(arr) => Some(ProfileKey { bytes: arr }),
|
||||
Err(_) => {
|
||||
warn!(%uuid, len = v.len(), "corrupted profile key (expected 32 bytes)");
|
||||
warn!(%uuid, len = v.len(), "corrupted profile key, expected 32 bytes");
|
||||
None
|
||||
}
|
||||
}))
|
||||
|
||||
@@ -1117,7 +1117,7 @@ impl ContentsStore for PgSignalStore {
|
||||
Some(r) => match <[u8; 32]>::try_from(r.key.as_slice()) {
|
||||
Ok(arr) => Some(ProfileKey { bytes: arr }),
|
||||
Err(_) => {
|
||||
warn!(%uuid, len = r.key.len(), "corrupted profile key (expected 32 bytes)");
|
||||
warn!(%uuid, len = r.key.len(), "corrupted profile key, expected 32 bytes");
|
||||
None
|
||||
}
|
||||
},
|
||||
|
||||
@@ -166,9 +166,8 @@ mod tests {
|
||||
#[test]
|
||||
fn flat_metrics_no_violation() {
|
||||
let cfg = LeakGateConfig::short_for_tests();
|
||||
let series: Vec<MetricsSample> = (0..20)
|
||||
.map(|i| sample(60_000 + i * 60_000, GIB))
|
||||
.collect();
|
||||
let series: Vec<MetricsSample> =
|
||||
(0..20).map(|i| sample(60_000 + i * 60_000, GIB)).collect();
|
||||
assert!(evaluate(&series, cfg).is_empty());
|
||||
}
|
||||
|
||||
@@ -221,9 +220,8 @@ mod tests {
|
||||
#[test]
|
||||
fn missing_metric_samples_skipped() {
|
||||
let cfg = LeakGateConfig::short_for_tests();
|
||||
let mut series: Vec<MetricsSample> = (0..10)
|
||||
.map(|i| sample(60_000 + i * 60_000, GIB))
|
||||
.collect();
|
||||
let mut series: Vec<MetricsSample> =
|
||||
(0..10).map(|i| sample(60_000 + i * 60_000, GIB)).collect();
|
||||
series[3].rss_bytes = None;
|
||||
series[7].rss_bytes = None;
|
||||
assert!(evaluate(&series, cfg).is_empty());
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
pub mod farm;
|
||||
pub mod invariants;
|
||||
pub mod leak;
|
||||
pub mod metrics;
|
||||
pub mod op;
|
||||
pub mod oracle;
|
||||
pub mod overrides;
|
||||
@@ -7,11 +9,14 @@ pub mod regression;
|
||||
pub mod runner;
|
||||
pub mod scenarios;
|
||||
pub mod shrink;
|
||||
pub mod soak;
|
||||
pub mod workload;
|
||||
|
||||
pub use invariants::{
|
||||
EventLogSnapshot, Invariant, InvariantSet, InvariantViolation, SnapshotEvent, invariants_for,
|
||||
};
|
||||
pub use leak::{LeakGateBuildError, LeakGateConfig, LeakViolation, evaluate as evaluate_leak_gate};
|
||||
pub use metrics::{MetricName, MetricsSample, sample_harness};
|
||||
pub use op::{
|
||||
CollectionName, DidSeed, EventKind, Op, OpStream, PayloadSeed, RecordKey, RetentionSecs, Seed,
|
||||
ValueSeed,
|
||||
@@ -26,6 +31,10 @@ pub use runner::{
|
||||
};
|
||||
pub use scenarios::{Scenario, UnknownScenario, config_for};
|
||||
pub use shrink::{ShrinkOutcome, shrink_failure};
|
||||
pub use soak::{
|
||||
DEFAULT_CHUNK_OPS, DEFAULT_SAMPLE_INTERVAL_MS, InvariantViolationRecord, SoakConfig, SoakError,
|
||||
SoakEvent, SoakReport, run_soak,
|
||||
};
|
||||
pub use workload::{
|
||||
ByteRange, DidSpaceSize, KeySpaceSize, OpCount, OpWeights, RetentionMaxSecs, SizeDistribution,
|
||||
ValueBytes, WorkloadModel,
|
||||
|
||||
@@ -97,10 +97,10 @@ impl RegressionRecord {
|
||||
f.sync_all()?;
|
||||
}
|
||||
std::fs::rename(&tmp, &path)?;
|
||||
if let Some(parent) = path.parent() {
|
||||
if let Ok(dir) = std::fs::File::open(parent) {
|
||||
let _ = dir.sync_all();
|
||||
}
|
||||
if let Some(parent) = path.parent()
|
||||
&& let Ok(dir) = std::fs::File::open(parent)
|
||||
{
|
||||
let _ = dir.sync_all();
|
||||
}
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
@@ -126,7 +126,7 @@ impl GauntletReport {
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
enum OpError {
|
||||
pub(super) enum OpError {
|
||||
#[error("mst add: {0}")]
|
||||
MstAdd(String),
|
||||
#[error("mst delete: {0}")]
|
||||
@@ -249,7 +249,7 @@ impl Gauntlet {
|
||||
}
|
||||
}
|
||||
|
||||
fn segments_subdir(root: &Path) -> PathBuf {
|
||||
pub(super) fn segments_subdir(root: &Path) -> PathBuf {
|
||||
root.join("segments")
|
||||
}
|
||||
|
||||
@@ -373,7 +373,7 @@ async fn run_inner_simulated(
|
||||
}
|
||||
}
|
||||
|
||||
fn open_eventlog<S: StorageIO + Send + Sync + 'static>(
|
||||
pub(super) fn open_eventlog<S: StorageIO + Send + Sync + 'static>(
|
||||
io: S,
|
||||
segments_dir: PathBuf,
|
||||
max_segment_size: u64,
|
||||
@@ -586,7 +586,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn eventlog_snapshot<S: StorageIO + Send + Sync + 'static>(
|
||||
pub(super) fn eventlog_snapshot<S: StorageIO + Send + Sync + 'static>(
|
||||
state: Option<&EventLogState<S>>,
|
||||
) -> Option<EventLogSnapshot> {
|
||||
let s = state?;
|
||||
@@ -746,7 +746,7 @@ async fn run_quick_check<S: StorageIO + Send + Sync + 'static>(
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_invariants<S: StorageIO + Send + Sync + 'static>(
|
||||
pub(super) async fn run_invariants<S: StorageIO + Send + Sync + 'static>(
|
||||
store: &Arc<TranquilBlockStore<S>>,
|
||||
oracle: &Oracle,
|
||||
root: Option<Cid>,
|
||||
@@ -828,7 +828,7 @@ fn diff_snapshots(pre: &[(CidBytes, u32)], post: &[(CidBytes, u32)]) -> Option<S
|
||||
))
|
||||
}
|
||||
|
||||
async fn refresh_oracle_graph<S: StorageIO + Send + Sync + 'static>(
|
||||
pub(super) async fn refresh_oracle_graph<S: StorageIO + Send + Sync + 'static>(
|
||||
store: &Arc<TranquilBlockStore<S>>,
|
||||
oracle: &mut Oracle,
|
||||
root: Option<Cid>,
|
||||
@@ -889,7 +889,7 @@ fn should_restart(policy: RestartPolicy, idx: OpIndex, rng: &mut Lcg) -> Restart
|
||||
}
|
||||
}
|
||||
|
||||
fn blockstore_config(dir: &std::path::Path, s: &StoreConfig) -> BlockStoreConfig {
|
||||
pub(super) fn blockstore_config(dir: &std::path::Path, s: &StoreConfig) -> BlockStoreConfig {
|
||||
BlockStoreConfig {
|
||||
data_dir: dir.join("data"),
|
||||
index_dir: dir.join("index"),
|
||||
@@ -954,7 +954,7 @@ fn did_hash_for_seed(seed: DidSeed) -> DidHash {
|
||||
DidHash::from_did(&format!("did:plc:gauntlet{:08x}", seed.0))
|
||||
}
|
||||
|
||||
async fn apply_op<S: StorageIO + Send + Sync + 'static>(
|
||||
pub(super) async fn apply_op<S: StorageIO + Send + Sync + 'static>(
|
||||
harness: &mut Harness<S>,
|
||||
root: &mut Option<Cid>,
|
||||
oracle: &mut Oracle,
|
||||
@@ -1202,7 +1202,7 @@ fn diff_obsolete(
|
||||
) -> Result<Vec<CidBytes>, OpError> {
|
||||
removed_mst_blocks
|
||||
.into_iter()
|
||||
.chain(removed_cids.into_iter())
|
||||
.chain(removed_cids)
|
||||
.map(|c| try_cid_to_fixed(&c))
|
||||
.collect::<Result<_, _>>()
|
||||
.map_err(OpError::from)
|
||||
|
||||
326
crates/tranquil-store/src/gauntlet/soak.rs
Normal file
326
crates/tranquil-store/src/gauntlet/soak.rs
Normal file
@@ -0,0 +1,326 @@
|
||||
use std::io::{self, Write};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use cid::Cid;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::warn;
|
||||
|
||||
use super::invariants::{InvariantSet, InvariantViolation};
|
||||
use super::leak::{LeakGateConfig, LeakViolation, evaluate as evaluate_leak_gate};
|
||||
use super::metrics::{MetricsSample, sample_harness};
|
||||
use super::op::{OpStream, Seed};
|
||||
use super::oracle::Oracle;
|
||||
use super::runner::{
|
||||
EventLogState, GauntletConfig, Harness, IoBackend, apply_op, blockstore_config,
|
||||
eventlog_snapshot, open_eventlog, refresh_oracle_graph, run_invariants, segments_subdir,
|
||||
};
|
||||
use super::workload::OpCount;
|
||||
use crate::blockstore::TranquilBlockStore;
|
||||
use crate::io::{RealIO, StorageIO};
|
||||
|
||||
const OP_ERROR_LOG_THROTTLE: u64 = 1024;
|
||||
|
||||
pub const DEFAULT_CHUNK_OPS: usize = 5_000;
|
||||
pub const DEFAULT_SAMPLE_INTERVAL_MS: u64 = 60_000;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SoakConfig {
|
||||
pub gauntlet: GauntletConfig,
|
||||
pub total_duration: Duration,
|
||||
pub sample_interval: Duration,
|
||||
pub chunk_ops: usize,
|
||||
pub leak_gate: LeakGateConfig,
|
||||
}
|
||||
|
||||
impl SoakConfig {
|
||||
pub fn new(gauntlet: GauntletConfig, total_duration: Duration) -> Self {
|
||||
Self {
|
||||
gauntlet,
|
||||
total_duration,
|
||||
sample_interval: Duration::from_millis(DEFAULT_SAMPLE_INTERVAL_MS),
|
||||
chunk_ops: DEFAULT_CHUNK_OPS,
|
||||
leak_gate: LeakGateConfig::standard(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum SoakError {
|
||||
#[error(transparent)]
|
||||
Io(#[from] io::Error),
|
||||
#[error("soak requires IoBackend::Real; scenario configured Simulated")]
|
||||
SimulatedBackendRejected,
|
||||
#[error("open block store: {0}")]
|
||||
StoreOpen(String),
|
||||
#[error("open event log: {0}")]
|
||||
EventLogOpen(String),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SoakReport {
|
||||
pub seed: Seed,
|
||||
pub ops_executed: u64,
|
||||
pub op_errors: u64,
|
||||
pub chunks: u64,
|
||||
pub samples: Vec<MetricsSample>,
|
||||
pub invariant_violations: Vec<InvariantViolationRecord>,
|
||||
pub leak_violations: Vec<LeakViolation>,
|
||||
pub total_wall_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct InvariantViolationRecord {
|
||||
pub invariant: String,
|
||||
pub detail: String,
|
||||
}
|
||||
|
||||
impl SoakReport {
|
||||
pub fn is_clean(&self) -> bool {
|
||||
self.invariant_violations.is_empty() && self.leak_violations.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum SoakEvent {
|
||||
#[serde(rename = "sample")]
|
||||
Sample {
|
||||
seed: u64,
|
||||
chunk: u64,
|
||||
ops_executed: u64,
|
||||
sample: MetricsSample,
|
||||
},
|
||||
#[serde(rename = "invariant_violation")]
|
||||
Invariant {
|
||||
seed: u64,
|
||||
invariant: String,
|
||||
detail: String,
|
||||
},
|
||||
#[serde(rename = "summary")]
|
||||
Summary {
|
||||
seed: u64,
|
||||
total_wall_ms: u64,
|
||||
ops_executed: u64,
|
||||
op_errors: u64,
|
||||
chunks: u64,
|
||||
clean: bool,
|
||||
invariant_violations: usize,
|
||||
leak_violations: Vec<LeakViolation>,
|
||||
},
|
||||
}
|
||||
|
||||
pub async fn run_soak<W: Write + Send>(
|
||||
cfg: SoakConfig,
|
||||
mut emitter: W,
|
||||
) -> Result<SoakReport, SoakError> {
|
||||
if !matches!(cfg.gauntlet.io, IoBackend::Real) {
|
||||
return Err(SoakError::SimulatedBackendRejected);
|
||||
}
|
||||
let dir = tempfile::TempDir::new()?;
|
||||
let store_cfg = blockstore_config(dir.path(), &cfg.gauntlet.store);
|
||||
let segments_dir: PathBuf = segments_subdir(dir.path());
|
||||
let store = TranquilBlockStore::open(store_cfg)
|
||||
.map(Arc::new)
|
||||
.map_err(|e| SoakError::StoreOpen(e.to_string()))?;
|
||||
let eventlog: Option<EventLogState<RealIO>> = match cfg.gauntlet.eventlog {
|
||||
None => None,
|
||||
Some(ec) => Some(
|
||||
open_eventlog(RealIO::new(), segments_dir, ec.max_segment_size.0)
|
||||
.map_err(|e| SoakError::EventLogOpen(e.to_string()))?,
|
||||
),
|
||||
};
|
||||
let mut harness = Harness { store, eventlog };
|
||||
let outcome = drive_soak(&mut harness, &cfg, &mut emitter).await;
|
||||
shutdown_harness(&mut harness);
|
||||
outcome
|
||||
}
|
||||
|
||||
fn shutdown_harness<S: StorageIO + Send + Sync + 'static>(harness: &mut Harness<S>) {
|
||||
if let Some(el) = harness.eventlog.as_mut() {
|
||||
if let Err(e) = el.writer.shutdown() {
|
||||
warn!(error = %e, "soak: event log writer shutdown failed");
|
||||
}
|
||||
el.manager.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
async fn drive_soak<S, W>(
|
||||
harness: &mut Harness<S>,
|
||||
cfg: &SoakConfig,
|
||||
emitter: &mut W,
|
||||
) -> Result<SoakReport, SoakError>
|
||||
where
|
||||
S: StorageIO + Send + Sync + 'static,
|
||||
W: Write + Send,
|
||||
{
|
||||
let mut oracle = Oracle::new();
|
||||
let mut root: Option<Cid> = None;
|
||||
|
||||
let start = Instant::now();
|
||||
let mut samples: Vec<MetricsSample> = Vec::new();
|
||||
let mut invariant_records: Vec<InvariantViolationRecord> = Vec::new();
|
||||
let mut ops_executed: u64 = 0;
|
||||
let mut op_errors: u64 = 0;
|
||||
let mut chunks: u64 = 0;
|
||||
let mut last_sample = start;
|
||||
let mut next_error_log_at: u64 = 1;
|
||||
|
||||
let initial = sample_harness(harness, Duration::ZERO);
|
||||
emit_event(
|
||||
emitter,
|
||||
&SoakEvent::Sample {
|
||||
seed: cfg.gauntlet.seed.0,
|
||||
chunk: 0,
|
||||
ops_executed: 0,
|
||||
sample: initial,
|
||||
},
|
||||
)?;
|
||||
samples.push(initial);
|
||||
|
||||
while start.elapsed() < cfg.total_duration {
|
||||
let chunk_seed = Seed(
|
||||
cfg.gauntlet
|
||||
.seed
|
||||
.0
|
||||
.wrapping_add(chunks.wrapping_mul(0x9E37_79B9_7F4A_7C15)),
|
||||
);
|
||||
let stream: OpStream = cfg
|
||||
.gauntlet
|
||||
.workload
|
||||
.generate(chunk_seed, OpCount(cfg.chunk_ops));
|
||||
for op in stream.iter() {
|
||||
if start.elapsed() >= cfg.total_duration {
|
||||
break;
|
||||
}
|
||||
match apply_op(harness, &mut root, &mut oracle, op, &cfg.gauntlet.workload).await {
|
||||
Ok(()) => {
|
||||
ops_executed = ops_executed.saturating_add(1);
|
||||
}
|
||||
Err(e) => {
|
||||
op_errors = op_errors.saturating_add(1);
|
||||
if op_errors >= next_error_log_at {
|
||||
warn!(
|
||||
op_errors,
|
||||
ops_executed,
|
||||
elapsed_ms = u64::try_from(start.elapsed().as_millis())
|
||||
.unwrap_or(u64::MAX),
|
||||
error = %e,
|
||||
"soak: op error milestone"
|
||||
);
|
||||
next_error_log_at = next_error_log_at
|
||||
.saturating_mul(2)
|
||||
.max(OP_ERROR_LOG_THROTTLE);
|
||||
}
|
||||
}
|
||||
}
|
||||
if last_sample.elapsed() >= cfg.sample_interval {
|
||||
let elapsed = start.elapsed();
|
||||
let s = sample_harness(harness, elapsed);
|
||||
emit_event(
|
||||
emitter,
|
||||
&SoakEvent::Sample {
|
||||
seed: cfg.gauntlet.seed.0,
|
||||
chunk: chunks,
|
||||
ops_executed,
|
||||
sample: s,
|
||||
},
|
||||
)?;
|
||||
samples.push(s);
|
||||
last_sample = Instant::now();
|
||||
}
|
||||
}
|
||||
chunks = chunks.saturating_add(1);
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
|
||||
let final_elapsed = start.elapsed();
|
||||
let final_sample = sample_harness(harness, final_elapsed);
|
||||
emit_event(
|
||||
emitter,
|
||||
&SoakEvent::Sample {
|
||||
seed: cfg.gauntlet.seed.0,
|
||||
chunk: chunks,
|
||||
ops_executed,
|
||||
sample: final_sample,
|
||||
},
|
||||
)?;
|
||||
samples.push(final_sample);
|
||||
|
||||
let invariants = match refresh_oracle_graph(&harness.store, &mut oracle, root).await {
|
||||
Ok(()) => {
|
||||
let snapshot = eventlog_snapshot(harness.eventlog.as_ref());
|
||||
let set = cfg
|
||||
.gauntlet
|
||||
.invariants
|
||||
.without(InvariantSet::RESTART_IDEMPOTENT);
|
||||
run_invariants(&harness.store, &oracle, root, snapshot, set).await
|
||||
}
|
||||
Err(e) => vec![InvariantViolation {
|
||||
invariant: "MstRootDurability",
|
||||
detail: format!("refresh: {e}"),
|
||||
}],
|
||||
};
|
||||
for v in invariants.iter() {
|
||||
let rec = InvariantViolationRecord {
|
||||
invariant: v.invariant.to_string(),
|
||||
detail: v.detail.clone(),
|
||||
};
|
||||
emit_event(
|
||||
emitter,
|
||||
&SoakEvent::Invariant {
|
||||
seed: cfg.gauntlet.seed.0,
|
||||
invariant: rec.invariant.clone(),
|
||||
detail: rec.detail.clone(),
|
||||
},
|
||||
)?;
|
||||
invariant_records.push(rec);
|
||||
}
|
||||
|
||||
let leak_violations = evaluate_leak_gate(&samples, cfg.leak_gate);
|
||||
let total_wall_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
|
||||
let clean = invariant_records.is_empty() && leak_violations.is_empty();
|
||||
emit_event(
|
||||
emitter,
|
||||
&SoakEvent::Summary {
|
||||
seed: cfg.gauntlet.seed.0,
|
||||
total_wall_ms,
|
||||
ops_executed,
|
||||
op_errors,
|
||||
chunks,
|
||||
clean,
|
||||
invariant_violations: invariant_records.len(),
|
||||
leak_violations: leak_violations.clone(),
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(SoakReport {
|
||||
seed: cfg.gauntlet.seed,
|
||||
ops_executed,
|
||||
op_errors,
|
||||
chunks,
|
||||
samples,
|
||||
invariant_violations: invariant_records,
|
||||
leak_violations,
|
||||
total_wall_ms,
|
||||
})
|
||||
}
|
||||
|
||||
fn emit_event<W: Write>(emitter: &mut W, event: &SoakEvent) -> io::Result<()> {
|
||||
let line = serde_json::to_string(event).map_err(io::Error::other)?;
|
||||
writeln!(emitter, "{line}")?;
|
||||
emitter.flush()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn send_sync<T: Send + Sync>() {}
|
||||
|
||||
#[test]
|
||||
fn soak_error_is_send_sync() {
|
||||
send_sync::<SoakError>();
|
||||
}
|
||||
}
|
||||
119
crates/tranquil-store/tests/gauntlet_soak.rs
Normal file
119
crates/tranquil-store/tests/gauntlet_soak.rs
Normal file
@@ -0,0 +1,119 @@
|
||||
use std::io::{self, BufWriter, Write};
|
||||
use std::time::Duration;
|
||||
|
||||
use tranquil_store::gauntlet::{
|
||||
LeakGateConfig, Scenario, Seed, SoakConfig, SoakReport, config_for, run_soak,
|
||||
};
|
||||
|
||||
fn soak_hours() -> Option<f64> {
|
||||
std::env::var("GAUNTLET_SOAK_HOURS")
|
||||
.ok()
|
||||
.and_then(|s| s.parse::<f64>().ok())
|
||||
.filter(|h| h.is_finite() && *h > 0.0)
|
||||
}
|
||||
|
||||
fn soak_sample_interval() -> Duration {
|
||||
std::env::var("GAUNTLET_SOAK_SAMPLE_SECS")
|
||||
.ok()
|
||||
.and_then(|s| s.parse::<u64>().ok())
|
||||
.filter(|s| *s > 0)
|
||||
.map(Duration::from_secs)
|
||||
.unwrap_or_else(|| Duration::from_secs(60))
|
||||
}
|
||||
|
||||
fn emitter_stream() -> Box<dyn Write + Send> {
|
||||
match std::env::var("GAUNTLET_SOAK_OUTPUT").ok().as_deref() {
|
||||
Some(path) if !path.is_empty() => {
|
||||
let f = std::fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(path)
|
||||
.expect("open GAUNTLET_SOAK_OUTPUT target");
|
||||
Box::new(BufWriter::new(f))
|
||||
}
|
||||
_ => Box::new(BufWriter::new(io::stderr())),
|
||||
}
|
||||
}
|
||||
|
||||
fn report_summary(report: &SoakReport) -> String {
|
||||
let leaks: Vec<String> = report
|
||||
.leak_violations
|
||||
.iter()
|
||||
.map(|v| {
|
||||
format!(
|
||||
"{}: {} -> {} ({}% over {}ms window, limit {}%)",
|
||||
v.metric,
|
||||
v.start_value,
|
||||
v.end_value,
|
||||
v.growth_pct.round() as i64,
|
||||
v.end_ms - v.start_ms,
|
||||
v.limit_pct
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
let invariants: Vec<String> = report
|
||||
.invariant_violations
|
||||
.iter()
|
||||
.map(|v| format!("{}: {}", v.invariant, v.detail))
|
||||
.collect();
|
||||
format!(
|
||||
"seed={:016x} ops={} chunks={} errors={} wall_ms={} leaks=[{}] invariants=[{}]",
|
||||
report.seed.0,
|
||||
report.ops_executed,
|
||||
report.chunks,
|
||||
report.op_errors,
|
||||
report.total_wall_ms,
|
||||
leaks.join(" ; "),
|
||||
invariants.join(" ; "),
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn soak_short_smoke() {
|
||||
let cfg = SoakConfig {
|
||||
gauntlet: config_for(Scenario::SmokePR, Seed(7)),
|
||||
total_duration: Duration::from_secs(10),
|
||||
sample_interval: Duration::from_secs(2),
|
||||
chunk_ops: 200,
|
||||
leak_gate: LeakGateConfig::try_new(0, 60_000, 1000.0).expect("valid leak gate"),
|
||||
};
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
let report = run_soak(cfg, &mut buf).await.expect("soak run");
|
||||
assert!(
|
||||
report.samples.len() >= 3,
|
||||
"expected at least initial + periodic + final samples, got {}",
|
||||
report.samples.len()
|
||||
);
|
||||
assert!(
|
||||
report.ops_executed > 0,
|
||||
"expected ops executed, got {}",
|
||||
report.ops_executed
|
||||
);
|
||||
let text = String::from_utf8(buf).expect("utf8 ndjson");
|
||||
assert!(
|
||||
text.contains("\"type\":\"summary\""),
|
||||
"ndjson must include summary line; got {text}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore = "configurable via GAUNTLET_SOAK_HOURS; default 24h leak gate (1h warmup, 4h window, 5% limit)"]
|
||||
async fn soak_long_leak_gate() {
|
||||
let hours = soak_hours().unwrap_or(24.0);
|
||||
let total = Duration::from_secs_f64(hours * 3600.0);
|
||||
let cfg = SoakConfig {
|
||||
gauntlet: config_for(Scenario::MstChurn, Seed(0)),
|
||||
total_duration: total,
|
||||
sample_interval: soak_sample_interval(),
|
||||
chunk_ops: 10_000,
|
||||
leak_gate: LeakGateConfig::standard(),
|
||||
};
|
||||
let mut emitter = emitter_stream();
|
||||
let report = run_soak(cfg, &mut emitter).await.expect("soak run");
|
||||
let _ = emitter.flush();
|
||||
assert!(
|
||||
report.is_clean(),
|
||||
"soak failed: {}",
|
||||
report_summary(&report)
|
||||
);
|
||||
}
|
||||
@@ -203,7 +203,7 @@ fn concurrent_reader_survives_evict_handle() {
|
||||
let write_handle = manager.open_for_append(file_id).unwrap();
|
||||
{
|
||||
let mut writer = DataFileWriter::new(&*sim, write_handle.fd(), file_id).unwrap();
|
||||
let _ = writer.append_block(&test_cid(1), &vec![0x11; 128]).unwrap();
|
||||
let _ = writer.append_block(&test_cid(1), &[0x11; 128]).unwrap();
|
||||
writer.sync().unwrap();
|
||||
}
|
||||
drop(write_handle);
|
||||
|
||||
@@ -517,8 +517,8 @@ fn sim_aggressive_faults_data_integrity() {
|
||||
return;
|
||||
};
|
||||
let start_pos = writer.position();
|
||||
drop(writer);
|
||||
drop(handle);
|
||||
let _ = writer;
|
||||
let _ = handle;
|
||||
|
||||
let mut rng = Rng::new(seed);
|
||||
let block_count = (rng.range_u32(15) + 5) as u16;
|
||||
|
||||
@@ -64,8 +64,8 @@ fn rollback_rotation_does_not_leave_orphan_data_file() {
|
||||
io.sync_dir(&data_dir).unwrap();
|
||||
|
||||
let _ = io.delete(&hint_file_path(&data_dir, next_id));
|
||||
drop(writer);
|
||||
drop(next_handle);
|
||||
let _ = writer;
|
||||
let _ = next_handle;
|
||||
manager.rollback_rotation(next_id);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[server]
|
||||
# Public hostname of the PDS (e.g. `pds.example.com`).
|
||||
# Public hostname of the PDS, such as `pds.example.com`.
|
||||
#
|
||||
# Can also be specified via environment variable `PDS_HOSTNAME`.
|
||||
#
|
||||
|
||||
Reference in New Issue
Block a user