diff --git a/Cargo.lock b/Cargo.lock index 19d4db1..9e00762 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7405,7 +7405,7 @@ dependencies = [ [[package]] name = "tranquil-api" -version = "0.5.4" +version = "0.5.5" dependencies = [ "anyhow", "axum", @@ -7456,7 +7456,7 @@ dependencies = [ [[package]] name = "tranquil-auth" -version = "0.5.4" +version = "0.5.5" dependencies = [ "anyhow", "base32", @@ -7479,7 +7479,7 @@ dependencies = [ [[package]] name = "tranquil-cache" -version = "0.5.4" +version = "0.5.5" dependencies = [ "async-trait", "base64 0.22.1", @@ -7493,7 +7493,7 @@ dependencies = [ [[package]] name = "tranquil-comms" -version = "0.5.4" +version = "0.5.5" dependencies = [ "async-trait", "base64 0.22.1", @@ -7511,7 +7511,7 @@ dependencies = [ [[package]] name = "tranquil-config" -version = "0.5.4" +version = "0.5.5" dependencies = [ "confique", "serde", @@ -7519,7 +7519,7 @@ dependencies = [ [[package]] name = "tranquil-crypto" -version = "0.5.4" +version = "0.5.5" dependencies = [ "aes-gcm", "base64 0.22.1", @@ -7535,7 +7535,7 @@ dependencies = [ [[package]] name = "tranquil-db" -version = "0.5.4" +version = "0.5.5" dependencies = [ "async-trait", "chrono", @@ -7552,7 +7552,7 @@ dependencies = [ [[package]] name = "tranquil-db-traits" -version = "0.5.4" +version = "0.5.5" dependencies = [ "async-trait", "base64 0.22.1", @@ -7568,7 +7568,7 @@ dependencies = [ [[package]] name = "tranquil-infra" -version = "0.5.4" +version = "0.5.5" dependencies = [ "async-trait", "bytes", @@ -7579,9 +7579,10 @@ dependencies = [ [[package]] name = "tranquil-lexicon" -version = "0.5.4" +version = "0.5.5" dependencies = [ "chrono", + "futures", "hickory-resolver", "parking_lot", "reqwest", @@ -7597,7 +7598,7 @@ dependencies = [ [[package]] name = "tranquil-oauth" -version = "0.5.4" +version = "0.5.5" dependencies = [ "anyhow", "axum", @@ -7620,7 +7621,7 @@ dependencies = [ [[package]] name = "tranquil-oauth-server" -version = "0.5.4" +version = "0.5.5" dependencies = [ "axum", "base64 0.22.1", @@ -7653,7 +7654,7 @@ dependencies = [ [[package]] name = "tranquil-pds" -version = "0.5.4" +version = "0.5.5" dependencies = [ "aes-gcm", "anyhow", @@ -7745,7 +7746,7 @@ dependencies = [ [[package]] name = "tranquil-repo" -version = "0.5.4" +version = "0.5.5" dependencies = [ "bytes", "cid", @@ -7757,7 +7758,7 @@ dependencies = [ [[package]] name = "tranquil-ripple" -version = "0.5.4" +version = "0.5.5" dependencies = [ "async-trait", "backon", @@ -7782,7 +7783,7 @@ dependencies = [ [[package]] name = "tranquil-scopes" -version = "0.5.4" +version = "0.5.5" dependencies = [ "axum", "futures", @@ -7798,7 +7799,7 @@ dependencies = [ [[package]] name = "tranquil-server" -version = "0.5.4" +version = "0.5.5" dependencies = [ "axum", "clap", @@ -7819,7 +7820,7 @@ dependencies = [ [[package]] name = "tranquil-signal" -version = "0.5.4" +version = "0.5.5" dependencies = [ "async-trait", "chrono", @@ -7842,7 +7843,7 @@ dependencies = [ [[package]] name = "tranquil-storage" -version = "0.5.4" +version = "0.5.5" dependencies = [ "async-trait", "aws-config", @@ -7859,7 +7860,7 @@ dependencies = [ [[package]] name = "tranquil-store" -version = "0.5.4" +version = "0.5.5" dependencies = [ "async-trait", "bytes", @@ -7906,7 +7907,7 @@ dependencies = [ [[package]] name = "tranquil-sync" -version = "0.5.4" +version = "0.5.5" dependencies = [ "anyhow", "axum", @@ -7928,7 +7929,7 @@ dependencies = [ [[package]] name = "tranquil-types" -version = "0.5.4" +version = "0.5.5" dependencies = [ "chrono", "cid", diff --git a/Cargo.toml b/Cargo.toml index 4b00bee..9117f0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ members = [ ] [workspace.package] -version = "0.5.4" +version = "0.5.5" edition = "2024" license = "AGPL-3.0-or-later" diff --git a/crates/tranquil-lexicon/Cargo.toml b/crates/tranquil-lexicon/Cargo.toml index 6a9a037..18aa29a 100644 --- a/crates/tranquil-lexicon/Cargo.toml +++ b/crates/tranquil-lexicon/Cargo.toml @@ -24,3 +24,4 @@ urlencoding = { workspace = true, optional = true } [dev-dependencies] wiremock = { workspace = true } tokio = { workspace = true } +futures = { workspace = true } diff --git a/crates/tranquil-lexicon/src/dynamic.rs b/crates/tranquil-lexicon/src/dynamic.rs index 0942879..a9743c1 100644 --- a/crates/tranquil-lexicon/src/dynamic.rs +++ b/crates/tranquil-lexicon/src/dynamic.rs @@ -5,25 +5,60 @@ use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, Instant}; +use tokio::sync::Notify; const NEGATIVE_CACHE_TTL: Duration = Duration::from_secs(24 * 60 * 60); +const POSITIVE_CACHE_TTL: Duration = Duration::from_secs(24 * 60 * 60); +const REFRESH_FAILURE_BACKOFF: Duration = Duration::from_secs(60); const MAX_DYNAMIC_SCHEMAS: usize = 1024; struct NegativeEntry { expires_at: Instant, } +struct PositiveEntry { + doc: Arc, + expires_at: Instant, +} + +pub(crate) enum CacheEntry { + Fresh(Arc), + Stale(Arc), +} + +impl CacheEntry { + #[cfg(test)] + fn is_fresh(&self) -> bool { + matches!(self, Self::Fresh(_)) + } +} + struct SchemaStore { - schemas: HashMap>, + schemas: HashMap, insertion_order: VecDeque, } pub struct DynamicRegistry { store: RwLock, negative_cache: RwLock>, + in_flight: RwLock>>, network_disabled: AtomicBool, } +struct InFlightGuard<'a> { + registry: &'a DynamicRegistry, + nsid: String, +} + +impl Drop for InFlightGuard<'_> { + fn drop(&mut self) { + let notify = self.registry.in_flight.write().remove(&self.nsid); + if let Some(n) = notify { + n.notify_waiters(); + } + } +} + impl DynamicRegistry { pub fn new() -> Self { let network_disabled = @@ -34,6 +69,7 @@ impl DynamicRegistry { insertion_order: VecDeque::new(), }), negative_cache: RwLock::new(HashMap::new()), + in_flight: RwLock::new(HashMap::new()), network_disabled: AtomicBool::new(network_disabled), } } @@ -43,8 +79,23 @@ impl DynamicRegistry { self.network_disabled.store(disabled, Ordering::Relaxed); } - pub fn get(&self, nsid: &str) -> Option> { - self.store.read().schemas.get(nsid).cloned() + pub fn get_cached(&self, nsid: &str) -> Option> { + self.store + .read() + .schemas + .get(nsid) + .map(|e| Arc::clone(&e.doc)) + } + + pub(crate) fn get_entry(&self, nsid: &str) -> Option { + let now = Instant::now(); + self.store.read().schemas.get(nsid).map(|e| { + if e.expires_at > now { + CacheEntry::Fresh(Arc::clone(&e.doc)) + } else { + CacheEntry::Stale(Arc::clone(&e.doc)) + } + }) } pub fn is_negative_cached(&self, nsid: &str) -> bool { @@ -56,7 +107,7 @@ impl DynamicRegistry { fn insert_negative(&self, nsid: &str) { let mut cache = self.negative_cache.write(); - if cache.len() > MAX_DYNAMIC_SCHEMAS { + if cache.len() >= MAX_DYNAMIC_SCHEMAS { let now = Instant::now(); cache.retain(|_, entry| entry.expires_at > now); } @@ -87,29 +138,95 @@ impl DynamicRegistry { }); } - if store - .schemas - .insert(nsid.clone(), Arc::clone(&arc)) - .is_some() - { + let entry = PositiveEntry { + doc: Arc::clone(&arc), + expires_at: Instant::now() + POSITIVE_CACHE_TTL, + }; + if store.schemas.insert(nsid.clone(), entry).is_some() { store.insertion_order.retain(|k| k != &nsid); } store.insertion_order.push_back(nsid.clone()); + drop(store); self.negative_cache.write().remove(&arc.id); arc } + fn bump_expiry(&self, nsid: &str, duration: Duration) { + let mut store = self.store.write(); + if let Some(entry) = store.schemas.get_mut(nsid) { + entry.expires_at = Instant::now() + duration; + } + } + pub async fn resolve_and_cache(&self, nsid: &str) -> Result, ResolveError> { - if let Some(doc) = self.get(nsid) { - return Ok(doc); + self.resolve_and_cache_with(nsid, |n| async move { resolve_lexicon(&n).await }) + .await + } + + async fn resolve_and_cache_with( + &self, + nsid: &str, + resolver: F, + ) -> Result, ResolveError> + where + F: FnOnce(String) -> Fut, + Fut: std::future::Future>, + { + match self.get_entry(nsid) { + Some(CacheEntry::Fresh(doc)) => Ok(doc), + Some(CacheEntry::Stale(stale)) => self.refresh_stale(nsid, stale, resolver).await, + None => self.resolve_fresh(nsid, resolver).await, + } + } + + async fn refresh_stale( + &self, + nsid: &str, + stale: Arc, + resolver: F, + ) -> Result, ResolveError> + where + F: FnOnce(String) -> Fut, + Fut: std::future::Future>, + { + if self.network_disabled.load(Ordering::Relaxed) { + return Ok(stale); } + match self.acquire_leadership(nsid) { + Some(_guard) => match resolver(nsid.to_string()).await { + Ok(doc) => Ok(self.insert_schema(doc)), + Err(e) => { + self.bump_expiry(nsid, REFRESH_FAILURE_BACKOFF); + tracing::warn!( + nsid = nsid, + error = %e, + "lexicon refresh failed, serving stale cached entry" + ); + Ok(stale) + } + }, + None => { + self.wait_for_leader(nsid).await; + Ok(self.get_cached(nsid).unwrap_or(stale)) + } + } + } + + async fn resolve_fresh( + &self, + nsid: &str, + resolver: F, + ) -> Result, ResolveError> + where + F: FnOnce(String) -> Fut, + Fut: std::future::Future>, + { if self.network_disabled.load(Ordering::Relaxed) { return Err(ResolveError::NetworkDisabled); } - if self.is_negative_cached(nsid) { return Err(ResolveError::NegativelyCached { nsid: nsid.to_string(), @@ -117,19 +234,75 @@ impl DynamicRegistry { }); } - match resolve_lexicon(nsid).await { - Ok(doc) => Ok(self.insert_schema(doc)), - Err(e) => { - tracing::debug!(nsid = nsid, error = %e, "caching negative resolution result"); - self.insert_negative(nsid); - Err(e) + match self.acquire_leadership(nsid) { + Some(_guard) => match resolver(nsid.to_string()).await { + Ok(doc) => Ok(self.insert_schema(doc)), + Err(e) => { + self.insert_negative(nsid); + tracing::debug!(nsid = nsid, error = %e, "caching negative resolution result"); + Err(e) + } + }, + None => { + self.wait_for_leader(nsid).await; + match self.get_cached(nsid) { + Some(doc) => Ok(doc), + None if self.is_negative_cached(nsid) => { + Err(ResolveError::NegativelyCached { + nsid: nsid.to_string(), + ttl_secs: NEGATIVE_CACHE_TTL.as_secs(), + }) + } + None => Err(ResolveError::LeaderAborted { + nsid: nsid.to_string(), + }), + } } } } + fn acquire_leadership(&self, nsid: &str) -> Option> { + let mut map = self.in_flight.write(); + if map.contains_key(nsid) { + None + } else { + map.insert(nsid.to_string(), Arc::new(Notify::new())); + Some(InFlightGuard { + registry: self, + nsid: nsid.to_string(), + }) + } + } + + async fn wait_for_leader(&self, nsid: &str) { + let notify = { + let map = self.in_flight.read(); + match map.get(nsid) { + Some(n) => Arc::clone(n), + None => return, + } + }; + let notified = notify.notified(); + tokio::pin!(notified); + notified.as_mut().enable(); + let still_active = self.in_flight.read().contains_key(nsid); + if !still_active { + return; + } + notified.as_mut().await; + } + pub fn schema_count(&self) -> usize { self.store.read().schemas.len() } + + #[cfg(test)] + fn expire_now(&self, nsid: &str) { + let mut store = self.store.write(); + if let Some(entry) = store.schemas.get_mut(nsid) { + entry.expires_at = Instant::now(); + } + } } impl Default for DynamicRegistry { @@ -171,7 +344,7 @@ mod tests { #[test] fn test_empty_lookup() { let registry = DynamicRegistry::new(); - assert!(registry.get("com.example.nonexistent").is_none()); + assert!(registry.get_cached("com.example.nonexistent").is_none()); assert_eq!(registry.schema_count(), 0); } @@ -188,9 +361,12 @@ mod tests { assert_eq!(arc.id, "com.example.test"); assert_eq!(registry.schema_count(), 1); - let retrieved = registry.get("com.example.test"); + let retrieved = registry.get_cached("com.example.test"); assert!(retrieved.is_some()); assert_eq!(retrieved.unwrap().id, "com.example.test"); + + let entry = registry.get_entry("com.example.test").unwrap(); + assert!(entry.is_fresh(), "freshly inserted entry must be fresh"); } #[test] @@ -210,6 +386,231 @@ mod tests { assert!(!registry.is_negative_cached("com.example.test")); } + #[test] + fn test_positive_entry_reports_stale_after_ttl() { + let registry = DynamicRegistry::new(); + let doc = LexiconDoc { + lexicon: 1, + id: "pet.nel.stale".to_string(), + defs: HashMap::new(), + }; + registry.insert_schema(doc); + + assert!(registry.get_entry("pet.nel.stale").unwrap().is_fresh()); + + registry.expire_now("pet.nel.stale"); + + assert!( + !registry.get_entry("pet.nel.stale").unwrap().is_fresh(), + "entry past expiry must be reported stale" + ); + } + + #[tokio::test] + async fn test_stale_served_on_resolve_failure() { + let registry = DynamicRegistry::new(); + let doc = LexiconDoc { + lexicon: 1, + id: "pet.nel.flaky".to_string(), + defs: HashMap::new(), + }; + registry.insert_schema(doc); + registry.expire_now("pet.nel.flaky"); + + let result = registry + .resolve_and_cache_with("pet.nel.flaky", |n| async move { + Err::(ResolveError::DnsLookup { + domain: n, + reason: "simulated failure".to_string(), + }) + }) + .await; + + let served = result.expect("stale entry must be served when refresh fails"); + assert_eq!(served.id, "pet.nel.flaky"); + assert!( + registry + .get_entry("pet.nel.flaky") + .unwrap() + .is_fresh(), + "failed refresh must bump expiry so subsequent lookups skip the resolver" + ); + assert!( + !registry.is_negative_cached("pet.nel.flaky"), + "stale refresh failure must not poison negative cache" + ); + } + + #[tokio::test] + async fn test_fresh_hit_skips_resolver() { + let registry = DynamicRegistry::new(); + let doc = LexiconDoc { + lexicon: 1, + id: "pet.nel.fresh".to_string(), + defs: HashMap::new(), + }; + registry.insert_schema(doc); + + let result = registry + .resolve_and_cache_with("pet.nel.fresh", |_| async move { + panic!("resolver must not run on fresh hit") + }) + .await; + + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_stale_served_when_network_disabled() { + let registry = DynamicRegistry::new(); + let doc = LexiconDoc { + lexicon: 1, + id: "pet.nel.offline".to_string(), + defs: HashMap::new(), + }; + registry.insert_schema(doc); + registry.expire_now("pet.nel.offline"); + registry.set_network_disabled(true); + + let result = registry + .resolve_and_cache_with("pet.nel.offline", |_| async move { + panic!("resolver must not run when network disabled") + }) + .await; + + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_successful_refresh_updates_cached_at() { + let registry = DynamicRegistry::new(); + let doc = LexiconDoc { + lexicon: 1, + id: "pet.nel.refresh".to_string(), + defs: HashMap::new(), + }; + registry.insert_schema(doc); + registry.expire_now("pet.nel.refresh"); + + assert!( + !registry + .get_entry("pet.nel.refresh") + .unwrap() + .is_fresh() + ); + + let refreshed = registry + .resolve_and_cache_with("pet.nel.refresh", |n| async move { + Ok(LexiconDoc { + lexicon: 1, + id: n, + defs: HashMap::new(), + }) + }) + .await + .unwrap(); + + assert_eq!(refreshed.id, "pet.nel.refresh"); + assert!( + registry + .get_entry("pet.nel.refresh") + .unwrap() + .is_fresh(), + "refresh must restore freshness" + ); + } + + #[tokio::test] + async fn test_single_flight_dedups_concurrent_resolves() { + use std::sync::atomic::AtomicUsize; + let registry = Arc::new(DynamicRegistry::new()); + let calls = Arc::new(AtomicUsize::new(0)); + + let tasks: Vec<_> = (0..16) + .map(|_| { + let registry = Arc::clone(®istry); + let calls = Arc::clone(&calls); + tokio::spawn(async move { + registry + .resolve_and_cache_with("pet.nel.herd", |n| { + let calls = Arc::clone(&calls); + async move { + calls.fetch_add(1, Ordering::SeqCst); + tokio::time::sleep(Duration::from_millis(50)).await; + Ok(LexiconDoc { + lexicon: 1, + id: n, + defs: HashMap::new(), + }) + } + }) + .await + }) + }) + .collect(); + + let results = futures_collect(tasks).await; + results + .iter() + .for_each(|r| assert!(r.is_ok(), "all single-flight callers must succeed")); + assert_eq!( + calls.load(Ordering::SeqCst), + 1, + "single-flight must coalesce concurrent resolves" + ); + assert_eq!(registry.schema_count(), 1); + } + + #[tokio::test] + async fn test_single_flight_followers_observe_leader_failure() { + use std::sync::atomic::AtomicUsize; + let registry = Arc::new(DynamicRegistry::new()); + let calls = Arc::new(AtomicUsize::new(0)); + + let tasks: Vec<_> = (0..8) + .map(|_| { + let registry = Arc::clone(®istry); + let calls = Arc::clone(&calls); + tokio::spawn(async move { + registry + .resolve_and_cache_with("pet.nel.failHerd", |n| { + let calls = Arc::clone(&calls); + async move { + calls.fetch_add(1, Ordering::SeqCst); + tokio::time::sleep(Duration::from_millis(50)).await; + Err::(ResolveError::DnsLookup { + domain: n, + reason: "simulated".to_string(), + }) + } + }) + .await + }) + }) + .collect(); + + let results = futures_collect(tasks).await; + results + .iter() + .for_each(|r| assert!(r.is_err(), "all followers must observe leader failure")); + assert_eq!( + calls.load(Ordering::SeqCst), + 1, + "single-flight must coalesce failing resolves too" + ); + assert!(registry.is_negative_cached("pet.nel.failHerd")); + } + + async fn futures_collect( + handles: Vec>, + ) -> Vec { + futures::future::join_all(handles) + .await + .into_iter() + .map(|r| r.expect("task panicked")) + .collect() + } + #[test] fn test_eviction_is_fifo() { let registry = DynamicRegistry::new(); @@ -217,7 +618,7 @@ mod tests { (0..MAX_DYNAMIC_SCHEMAS).for_each(|i| { let doc = LexiconDoc { lexicon: 1, - id: format!("com.example.schema{}", i), + id: format!("pet.nel.schema{}", i), defs: HashMap::new(), }; registry.insert_schema(doc); @@ -226,23 +627,23 @@ mod tests { let trigger = LexiconDoc { lexicon: 1, - id: "com.example.trigger".to_string(), + id: "pet.nel.trigger".to_string(), defs: HashMap::new(), }; registry.insert_schema(trigger); assert!( - registry.get("com.example.schema0").is_none(), + registry.get_cached("pet.nel.schema0").is_none(), "oldest entry should be evicted" ); assert!( - registry.get("com.example.trigger").is_some(), + registry.get_cached("pet.nel.trigger").is_some(), "newly inserted entry should exist" ); let evict_count = MAX_DYNAMIC_SCHEMAS / 4; assert!( registry - .get(&format!("com.example.schema{}", evict_count)) + .get_cached(&format!("pet.nel.schema{}", evict_count)) .is_some(), "entry after eviction window should survive" ); @@ -253,7 +654,7 @@ mod tests { let registry = DynamicRegistry::new(); let doc = LexiconDoc { lexicon: 1, - id: "com.example.tracked".to_string(), + id: "pet.nel.tracked".to_string(), defs: HashMap::new(), }; let arc = registry.insert_schema(doc); @@ -265,7 +666,7 @@ mod tests { (0..MAX_DYNAMIC_SCHEMAS).for_each(|i| { registry.insert_schema(LexiconDoc { lexicon: 1, - id: format!("com.example.filler{}", i), + id: format!("pet.nel.filler{}", i), defs: HashMap::new(), }); }); diff --git a/crates/tranquil-lexicon/src/registry.rs b/crates/tranquil-lexicon/src/registry.rs index e6f26fe..083084d 100644 --- a/crates/tranquil-lexicon/src/registry.rs +++ b/crates/tranquil-lexicon/src/registry.rs @@ -43,7 +43,7 @@ impl LexiconRegistry { self.schemas.get(nsid).cloned().or_else(|| { #[cfg(feature = "resolve")] { - self.dynamic.get(nsid) + self.dynamic.get_cached(nsid) } #[cfg(not(feature = "resolve"))] { diff --git a/crates/tranquil-lexicon/src/resolve.rs b/crates/tranquil-lexicon/src/resolve.rs index 3180260..90d1536 100644 --- a/crates/tranquil-lexicon/src/resolve.rs +++ b/crates/tranquil-lexicon/src/resolve.rs @@ -70,6 +70,8 @@ pub enum ResolveError { NegativelyCached { nsid: String, ttl_secs: u64 }, #[error("network resolution disabled")] NetworkDisabled, + #[error("leader task for {nsid} aborted before completion")] + LeaderAborted { nsid: String }, } pub fn nsid_to_authority(nsid: &str) -> Result {