From 07c8bf5b7f437c8b5d692200b2a75b5d2b2e8af3 Mon Sep 17 00:00:00 2001 From: lewis Date: Sun, 8 Feb 2026 12:02:29 +0200 Subject: [PATCH] feat: cache locks less --- .config/nextest.toml | 8 + Cargo.lock | 46 +-- crates/tranquil-ripple/Cargo.toml | 2 + crates/tranquil-ripple/src/cache.rs | 45 ++- crates/tranquil-ripple/src/config.rs | 2 +- crates/tranquil-ripple/src/crdt/g_counter.rs | 13 + crates/tranquil-ripple/src/crdt/hlc.rs | 11 +- crates/tranquil-ripple/src/crdt/lww_map.rs | 103 ++++-- crates/tranquil-ripple/src/crdt/mod.rs | 348 ++++++++++++++---- crates/tranquil-ripple/src/engine.rs | 20 +- crates/tranquil-ripple/src/eviction.rs | 74 ++-- crates/tranquil-ripple/src/gossip.rs | 251 +++++++++---- crates/tranquil-ripple/src/lib.rs | 1 + crates/tranquil-ripple/src/metrics.rs | 108 ++++++ crates/tranquil-ripple/src/rate_limiter.rs | 15 +- crates/tranquil-ripple/src/transport.rs | 81 +++- .../tests/two_node_convergence.rs | 160 ++++++++ 17 files changed, 1005 insertions(+), 283 deletions(-) create mode 100644 crates/tranquil-ripple/src/metrics.rs diff --git a/.config/nextest.toml b/.config/nextest.toml index ffb316f..ed0b14f 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -29,6 +29,10 @@ test-group = "serial-env-tests" filter = "binary(whole_story)" test-group = "heavy-load-tests" +[[profile.default.overrides]] +filter = "test(/two_node_stress_concurrent_load/)" +test-group = "heavy-load-tests" + [[profile.ci.overrides]] filter = "test(/import_with_verification/) | test(/plc_migration/)" test-group = "serial-env-tests" @@ -40,3 +44,7 @@ test-group = "serial-env-tests" [[profile.ci.overrides]] filter = "binary(whole_story)" test-group = "heavy-load-tests" + +[[profile.ci.overrides]] +filter = "test(/two_node_stress_concurrent_load/)" +test-group = "heavy-load-tests" diff --git a/Cargo.lock b/Cargo.lock index e4cb79c..8b33e22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2778,7 +2778,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.1", + "socket2 0.5.10", "system-configuration", "tokio", "tower-service", @@ -4242,7 +4242,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls 0.23.35", - "socket2 0.6.1", + "socket2 0.5.10", "thiserror 2.0.17", "tokio", "tracing", @@ -4279,7 +4279,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.6.1", + "socket2 0.5.10", "tracing", "windows-sys 0.60.2", ] @@ -4402,7 +4402,7 @@ dependencies = [ "pin-project-lite", "ryu", "sha1_smol", - "socket2 0.6.1", + "socket2 0.6.2", "tokio", "tokio-util", "url", @@ -5134,9 +5134,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881" +checksum = "86f4aa3ad99f2088c990dfa82d367e19cb29268ed67c574d10d0a4bfe71f07e0" dependencies = [ "libc", "windows-sys 0.60.2", @@ -5693,7 +5693,7 @@ dependencies = [ "mio", "pin-project-lite", "signal-hook-registry", - "socket2 0.6.1", + "socket2 0.6.2", "tokio-macros", "windows-sys 0.61.2", ] @@ -5797,7 +5797,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "socket2 0.6.1", + "socket2 0.6.2", "sync_wrapper", "tokio", "tokio-stream", @@ -5963,7 +5963,7 @@ dependencies = [ [[package]] name = "tranquil-auth" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "base32", @@ -5985,7 +5985,7 @@ dependencies = [ [[package]] name = "tranquil-cache" -version = "0.1.0" +version = "0.2.0" dependencies = [ "async-trait", "base64 0.22.1", @@ -5998,7 +5998,7 @@ dependencies = [ [[package]] name = "tranquil-comms" -version = "0.1.0" +version = "0.2.0" dependencies = [ "async-trait", "base64 0.22.1", @@ -6012,7 +6012,7 @@ dependencies = [ [[package]] name = "tranquil-crypto" -version = "0.1.0" +version = "0.2.0" dependencies = [ "aes-gcm", "base64 0.22.1", @@ -6028,7 +6028,7 @@ dependencies = [ [[package]] name = "tranquil-db" -version = "0.1.0" +version = "0.2.0" dependencies = [ "async-trait", "chrono", @@ -6045,7 +6045,7 @@ dependencies = [ [[package]] name = "tranquil-db-traits" -version = "0.1.0" +version = "0.2.0" dependencies = [ "async-trait", "base64 0.22.1", @@ -6061,7 +6061,7 @@ dependencies = [ [[package]] name = "tranquil-infra" -version = "0.1.0" +version = "0.2.0" dependencies = [ "async-trait", "bytes", @@ -6071,7 +6071,7 @@ dependencies = [ [[package]] name = "tranquil-oauth" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "axum", @@ -6094,7 +6094,7 @@ dependencies = [ [[package]] name = "tranquil-pds" -version = "0.1.0" +version = "0.2.0" dependencies = [ "aes-gcm", "anyhow", @@ -6179,7 +6179,7 @@ dependencies = [ [[package]] name = "tranquil-repo" -version = "0.1.0" +version = "0.2.0" dependencies = [ "bytes", "cid", @@ -6191,7 +6191,7 @@ dependencies = [ [[package]] name = "tranquil-ripple" -version = "0.1.0" +version = "0.2.0" dependencies = [ "async-trait", "backon", @@ -6199,9 +6199,11 @@ dependencies = [ "bytes", "foca", "futures", + "metrics", "parking_lot", "rand 0.9.2", "serde", + "socket2 0.6.2", "thiserror 2.0.17", "tokio", "tokio-util", @@ -6213,7 +6215,7 @@ dependencies = [ [[package]] name = "tranquil-scopes" -version = "0.1.0" +version = "0.2.0" dependencies = [ "axum", "futures", @@ -6228,7 +6230,7 @@ dependencies = [ [[package]] name = "tranquil-storage" -version = "0.1.0" +version = "0.2.0" dependencies = [ "async-trait", "aws-config", @@ -6244,7 +6246,7 @@ dependencies = [ [[package]] name = "tranquil-types" -version = "0.1.0" +version = "0.2.0" dependencies = [ "chrono", "cid", diff --git a/crates/tranquil-ripple/Cargo.toml b/crates/tranquil-ripple/Cargo.toml index ed8436f..660b00c 100644 --- a/crates/tranquil-ripple/Cargo.toml +++ b/crates/tranquil-ripple/Cargo.toml @@ -12,9 +12,11 @@ backon = { workspace = true } bincode = { workspace = true } bytes = { workspace = true } foca = { workspace = true } +metrics = { workspace = true } parking_lot = { workspace = true } rand = "0.9" serde = { workspace = true } +socket2 = "0.6.2" thiserror = { workspace = true } tokio = { workspace = true, features = ["net", "io-util", "sync", "time"] } tokio-util = { workspace = true } diff --git a/crates/tranquil-ripple/src/cache.rs b/crates/tranquil-ripple/src/cache.rs index 64102ba..d37cc87 100644 --- a/crates/tranquil-ripple/src/cache.rs +++ b/crates/tranquil-ripple/src/cache.rs @@ -1,16 +1,16 @@ -use crate::crdt::CrdtStore; +use crate::crdt::ShardedCrdtStore; +use crate::metrics; use async_trait::async_trait; -use parking_lot::RwLock; use std::sync::Arc; use std::time::Duration; use tranquil_infra::{Cache, CacheError}; pub struct RippleCache { - store: Arc>, + store: Arc, } impl RippleCache { - pub fn new(store: Arc>) -> Self { + pub fn new(store: Arc) -> Self { Self { store } } } @@ -18,32 +18,45 @@ impl RippleCache { #[async_trait] impl Cache for RippleCache { async fn get(&self, key: &str) -> Option { - self.store - .read() + let result = self + .store .cache_get(key) - .and_then(|bytes| String::from_utf8(bytes).ok()) + .and_then(|bytes| String::from_utf8(bytes).ok()); + match result.is_some() { + true => metrics::record_cache_hit(), + false => metrics::record_cache_miss(), + } + result } async fn set(&self, key: &str, value: &str, ttl: Duration) -> Result<(), CacheError> { + let ttl_ms = u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX); self.store - .write() - .cache_set(key.to_string(), value.as_bytes().to_vec(), ttl.as_millis() as u64); + .cache_set(key.to_string(), value.as_bytes().to_vec(), ttl_ms); + metrics::record_cache_write(); Ok(()) } async fn delete(&self, key: &str) -> Result<(), CacheError> { - self.store.write().cache_delete(key); + self.store.cache_delete(key); + metrics::record_cache_delete(); Ok(()) } async fn get_bytes(&self, key: &str) -> Option> { - self.store.read().cache_get(key) + let result = self.store.cache_get(key); + match result.is_some() { + true => metrics::record_cache_hit(), + false => metrics::record_cache_miss(), + } + result } async fn set_bytes(&self, key: &str, value: &[u8], ttl: Duration) -> Result<(), CacheError> { + let ttl_ms = u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX); self.store - .write() - .cache_set(key.to_string(), value.to_vec(), ttl.as_millis() as u64); + .cache_set(key.to_string(), value.to_vec(), ttl_ms); + metrics::record_cache_write(); Ok(()) } } @@ -54,7 +67,7 @@ mod tests { #[tokio::test] async fn cache_trait_roundtrip() { - let store = Arc::new(RwLock::new(CrdtStore::new(1))); + let store = Arc::new(ShardedCrdtStore::new(1)); let cache = RippleCache::new(store); cache .set("test", "value", Duration::from_secs(60)) @@ -65,7 +78,7 @@ mod tests { #[tokio::test] async fn cache_trait_bytes() { - let store = Arc::new(RwLock::new(CrdtStore::new(1))); + let store = Arc::new(ShardedCrdtStore::new(1)); let cache = RippleCache::new(store); let data = vec![0xDE, 0xAD, 0xBE, 0xEF]; cache @@ -77,7 +90,7 @@ mod tests { #[tokio::test] async fn cache_trait_delete() { - let store = Arc::new(RwLock::new(CrdtStore::new(1))); + let store = Arc::new(ShardedCrdtStore::new(1)); let cache = RippleCache::new(store); cache .set("del", "x", Duration::from_secs(60)) diff --git a/crates/tranquil-ripple/src/config.rs b/crates/tranquil-ripple/src/config.rs index 683303e..f770efa 100644 --- a/crates/tranquil-ripple/src/config.rs +++ b/crates/tranquil-ripple/src/config.rs @@ -1,6 +1,6 @@ use std::net::SocketAddr; -fn fnv1a(data: &[u8]) -> u64 { +pub(crate) fn fnv1a(data: &[u8]) -> u64 { data.iter().fold(0xcbf29ce484222325u64, |hash, &byte| { (hash ^ byte as u64).wrapping_mul(0x100000001b3) }) diff --git a/crates/tranquil-ripple/src/crdt/g_counter.rs b/crates/tranquil-ripple/src/crdt/g_counter.rs index 2fcd301..2a88647 100644 --- a/crates/tranquil-ripple/src/crdt/g_counter.rs +++ b/crates/tranquil-ripple/src/crdt/g_counter.rs @@ -163,6 +163,9 @@ impl RateLimitStore { } pub fn peek_count(&self, key: &str, window_ms: u64, now_wall_ms: u64) -> u64 { + if window_ms == 0 { + return 0; + } match self.counters.get(key) { Some(counter) if counter.window_start_ms == Self::aligned_window_start(now_wall_ms, window_ms) => { counter.total() @@ -195,6 +198,16 @@ impl RateLimitStore { .fold(0usize, usize::saturating_add) } + pub fn extract_all_deltas(&self) -> Vec { + self.counters + .iter() + .map(|(key, counter)| GCounterDelta { + key: key.clone(), + counter: counter.clone(), + }) + .collect() + } + pub fn gc_expired(&mut self, now_wall_ms: u64) { let expired: Vec = self .counters diff --git a/crates/tranquil-ripple/src/crdt/hlc.rs b/crates/tranquil-ripple/src/crdt/hlc.rs index 9d73507..9748efa 100644 --- a/crates/tranquil-ripple/src/crdt/hlc.rs +++ b/crates/tranquil-ripple/src/crdt/hlc.rs @@ -54,10 +54,13 @@ impl Hlc { } fn physical_now() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as u64 + u64::try_from( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis(), + ) + .unwrap_or(u64::MAX) } pub fn now(&mut self) -> HlcTimestamp { diff --git a/crates/tranquil-ripple/src/crdt/lww_map.rs b/crates/tranquil-ripple/src/crdt/lww_map.rs index cf43123..cfea9eb 100644 --- a/crates/tranquil-ripple/src/crdt/lww_map.rs +++ b/crates/tranquil-ripple/src/crdt/lww_map.rs @@ -1,5 +1,4 @@ use super::hlc::HlcTimestamp; -use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, HashMap}; @@ -27,11 +26,37 @@ impl LwwEntry { } fn entry_byte_size(&self, key: &str) -> usize { - const OVERHEAD: usize = 128; - key.len() - + self.value.as_ref().map_or(0, Vec::len) - + std::mem::size_of::() - + OVERHEAD + const HASHMAP_ENTRY_OVERHEAD: usize = 64; + const BTREE_NODE_OVERHEAD: usize = 64; + const STRING_HEADER: usize = 24; + const COUNTER_SIZE: usize = 8; + + let key_len = key.len(); + let value_len = self.value.as_ref().map_or(0, Vec::len); + + let main_entry = key_len + .saturating_add(value_len) + .saturating_add(std::mem::size_of::()) + .saturating_add(HASHMAP_ENTRY_OVERHEAD); + + match self.is_tombstone() { + true => main_entry, + false => { + let lru_btree = COUNTER_SIZE + .saturating_add(STRING_HEADER) + .saturating_add(key_len) + .saturating_add(BTREE_NODE_OVERHEAD); + + let lru_hashmap = STRING_HEADER + .saturating_add(key_len) + .saturating_add(COUNTER_SIZE) + .saturating_add(HASHMAP_ENTRY_OVERHEAD); + + main_entry + .saturating_add(lru_btree) + .saturating_add(lru_hashmap) + } + } } } @@ -59,11 +84,26 @@ impl LruTracker { if let Some(old_counter) = self.key_to_counter.remove(key) { self.counter_to_key.remove(&old_counter); } + if self.counter >= u64::MAX - 1 { + self.compact(); + } self.counter = self.counter.saturating_add(1); self.counter_to_key.insert(self.counter, key.to_string()); self.key_to_counter.insert(key.to_string(), self.counter); } + fn compact(&mut self) { + let keys: Vec = self.counter_to_key.values().cloned().collect(); + self.counter_to_key.clear(); + self.key_to_counter.clear(); + keys.into_iter().enumerate().for_each(|(i, key)| { + let new_counter = (i as u64).saturating_add(1); + self.counter_to_key.insert(new_counter, key.clone()); + self.key_to_counter.insert(key, new_counter); + }); + self.counter = self.counter_to_key.len() as u64; + } + fn remove(&mut self, key: &str) { if let Some(counter) = self.key_to_counter.remove(key) { self.counter_to_key.remove(&counter); @@ -80,7 +120,7 @@ impl LruTracker { pub struct LwwMap { entries: HashMap, - lru: Mutex, + lru: LruTracker, estimated_bytes: usize, } @@ -88,7 +128,7 @@ impl LwwMap { pub fn new() -> Self { Self { entries: HashMap::new(), - lru: Mutex::new(LruTracker::new()), + lru: LruTracker::new(), estimated_bytes: 0, } } @@ -98,9 +138,7 @@ impl LwwMap { if entry.is_expired(now_wall_ms) || entry.is_tombstone() { return None; } - let value = entry.value.clone(); - self.lru.lock().promote(key); - value + entry.value.clone() } pub fn set(&mut self, key: String, value: Vec, timestamp: HlcTimestamp, ttl_ms: u64, wall_ms_now: u64) { @@ -113,18 +151,15 @@ impl LwwMap { self.remove_estimated_bytes(&key); self.estimated_bytes += entry.entry_byte_size(&key); self.entries.insert(key.clone(), entry); - self.lru.lock().promote(&key); + self.lru.promote(&key); } pub fn delete(&mut self, key: &str, timestamp: HlcTimestamp, wall_ms_now: u64) { - match self.entries.get(key) { + let ttl_ms = match self.entries.get(key) { Some(existing) if existing.timestamp >= timestamp => return, - _ => {} - } - let ttl_ms = self - .entries - .get(key) - .map_or(60_000, |e| e.ttl_ms.max(60_000)); + Some(existing) => existing.ttl_ms.max(60_000), + None => 60_000, + }; let entry = LwwEntry { value: None, timestamp, @@ -134,7 +169,7 @@ impl LwwMap { self.remove_estimated_bytes(key); self.estimated_bytes += entry.entry_byte_size(key); self.entries.insert(key.to_string(), entry); - self.lru.lock().remove(key); + self.lru.remove(key); } pub fn merge_entry(&mut self, key: String, remote: LwwEntry) -> bool { @@ -145,10 +180,9 @@ impl LwwMap { self.remove_estimated_bytes(&key); self.estimated_bytes += remote.entry_byte_size(&key); self.entries.insert(key.clone(), remote); - let mut lru = self.lru.lock(); match is_tombstone { - true => lru.remove(&key), - false => lru.promote(&key), + true => self.lru.remove(&key), + false => self.lru.promote(&key), } true } @@ -175,10 +209,7 @@ impl LwwMap { expired_keys.iter().for_each(|key| { self.remove_estimated_bytes(key); self.entries.remove(key); - }); - let mut lru = self.lru.lock(); - expired_keys.iter().for_each(|key| { - lru.remove(key); + self.lru.remove(key); }); } @@ -192,20 +223,23 @@ impl LwwMap { expired_keys.iter().for_each(|key| { self.remove_estimated_bytes(key); self.entries.remove(key); - }); - let mut lru = self.lru.lock(); - expired_keys.iter().for_each(|key| { - lru.remove(key); + self.lru.remove(key); }); } pub fn evict_lru(&mut self) -> Option { - let key = self.lru.lock().pop_least_recent()?; + let key = self.lru.pop_least_recent()?; self.remove_estimated_bytes(&key); self.entries.remove(&key); Some(key) } + pub fn touch(&mut self, key: &str) { + if self.entries.contains_key(key) { + self.lru.promote(key); + } + } + pub fn estimated_bytes(&self) -> usize { self.estimated_bytes } @@ -359,14 +393,13 @@ mod tests { } #[test] - fn lru_eviction() { + fn lru_eviction_by_write_order() { let mut map = LwwMap::new(); map.set("k1".into(), b"a".to_vec(), ts(100, 0, 1), 60_000, 100); map.set("k2".into(), b"b".to_vec(), ts(101, 0, 1), 60_000, 101); map.set("k3".into(), b"c".to_vec(), ts(102, 0, 1), 60_000, 102); - let _ = map.get("k1", 102); let evicted = map.evict_lru(); - assert_eq!(evicted.as_deref(), Some("k2")); + assert_eq!(evicted.as_deref(), Some("k1")); } #[test] diff --git a/crates/tranquil-ripple/src/crdt/mod.rs b/crates/tranquil-ripple/src/crdt/mod.rs index 06005a2..1fa6853 100644 --- a/crates/tranquil-ripple/src/crdt/mod.rs +++ b/crates/tranquil-ripple/src/crdt/mod.rs @@ -3,105 +3,214 @@ pub mod hlc; pub mod lww_map; pub mod g_counter; +use crate::config::fnv1a; use delta::CrdtDelta; use hlc::{Hlc, HlcTimestamp}; -use lww_map::LwwMap; +use lww_map::{LwwDelta, LwwMap}; use g_counter::RateLimitStore; +use parking_lot::{Mutex, RwLock}; use std::time::{SystemTime, UNIX_EPOCH}; -pub struct CrdtStore { - hlc: Hlc, +const SHARD_COUNT: usize = 64; +const MAX_PROMOTIONS_PER_SHARD: usize = 8192; +const MAX_REPLICABLE_VALUE_SIZE: usize = 15 * 1024 * 1024; + +struct CrdtShard { cache: LwwMap, rate_limits: RateLimitStore, last_broadcast_ts: HlcTimestamp, } -impl CrdtStore { - pub fn new(node_id: u64) -> Self { +impl CrdtShard { + fn new(node_id: u64) -> Self { Self { - hlc: Hlc::new(node_id), cache: LwwMap::new(), rate_limits: RateLimitStore::new(node_id), last_broadcast_ts: HlcTimestamp::ZERO, } } +} + +pub struct ShardedCrdtStore { + hlc: Mutex, + shards: Box<[RwLock]>, + promotions: Box<[Mutex>]>, + shard_mask: usize, + node_id: u64, +} + +impl ShardedCrdtStore { + pub fn new(node_id: u64) -> Self { + const { assert!(SHARD_COUNT.is_power_of_two()) }; + let shards: Vec> = (0..SHARD_COUNT) + .map(|_| RwLock::new(CrdtShard::new(node_id))) + .collect(); + let promotions: Vec>> = (0..SHARD_COUNT) + .map(|_| Mutex::new(Vec::new())) + .collect(); + Self { + hlc: Mutex::new(Hlc::new(node_id)), + shards: shards.into_boxed_slice(), + promotions: promotions.into_boxed_slice(), + shard_mask: SHARD_COUNT - 1, + node_id, + } + } + + fn shard_for(&self, key: &str) -> usize { + fnv1a(key.as_bytes()) as usize & self.shard_mask + } fn wall_ms_now() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as u64 + u64::try_from( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis(), + ) + .unwrap_or(u64::MAX) } pub fn cache_get(&self, key: &str) -> Option> { - self.cache.get(key, Self::wall_ms_now()) + let idx = self.shard_for(key); + let result = self.shards[idx].read().cache.get(key, Self::wall_ms_now()); + if result.is_some() { + let mut promos = self.promotions[idx].lock(); + if promos.len() < MAX_PROMOTIONS_PER_SHARD { + promos.push(key.to_string()); + } + } + result } - pub fn cache_set(&mut self, key: String, value: Vec, ttl_ms: u64) { - let ts = self.hlc.now(); - self.cache.set(key, value, ts, ttl_ms, Self::wall_ms_now()); + pub fn cache_set(&self, key: String, value: Vec, ttl_ms: u64) { + if value.len() > MAX_REPLICABLE_VALUE_SIZE { + tracing::warn!( + key = %key, + value_size = value.len(), + max = MAX_REPLICABLE_VALUE_SIZE, + "value exceeds replicable size limit, may fail to replicate to peers" + ); + } + let ts = self.hlc.lock().now(); + let wall = Self::wall_ms_now(); + self.shards[self.shard_for(&key)] + .write() + .cache + .set(key, value, ts, ttl_ms, wall); } - pub fn cache_delete(&mut self, key: &str) { - let ts = self.hlc.now(); - self.cache.delete(key, ts, Self::wall_ms_now()); + pub fn cache_delete(&self, key: &str) { + let ts = self.hlc.lock().now(); + let wall = Self::wall_ms_now(); + self.shards[self.shard_for(key)] + .write() + .cache + .delete(key, ts, wall); } pub fn rate_limit_peek(&self, key: &str, window_ms: u64) -> u64 { - self.rate_limits + self.shards[self.shard_for(key)] + .read() + .rate_limits .peek_count(key, window_ms, Self::wall_ms_now()) } - pub fn rate_limit_check(&mut self, key: &str, limit: u32, window_ms: u64) -> bool { - self.rate_limits + pub fn rate_limit_check(&self, key: &str, limit: u32, window_ms: u64) -> bool { + self.shards[self.shard_for(key)] + .write() + .rate_limits .check_and_increment(key, limit, window_ms, Self::wall_ms_now()) } pub fn peek_broadcast_delta(&self) -> CrdtDelta { - let cache_delta = { - let d = self.cache.extract_delta_since(self.last_broadcast_ts); - match d.entries.is_empty() { - true => None, - false => Some(d), - } + let mut cache_entries: Vec<(String, lww_map::LwwEntry)> = Vec::new(); + let mut rate_limit_deltas: Vec = Vec::new(); + + self.shards.iter().for_each(|shard_lock| { + let shard = shard_lock.read(); + let lww_delta = shard.cache.extract_delta_since(shard.last_broadcast_ts); + cache_entries.extend(lww_delta.entries); + rate_limit_deltas.extend(shard.rate_limits.extract_dirty_deltas()); + }); + + let cache_delta = match cache_entries.is_empty() { + true => None, + false => Some(LwwDelta { entries: cache_entries }), }; - let rate_limit_deltas = self.rate_limits.extract_dirty_deltas(); + CrdtDelta { version: 1, - source_node: self.hlc.node_id(), + source_node: self.node_id, cache_delta, rate_limit_deltas, } } - pub fn commit_broadcast(&mut self, delta: &CrdtDelta) { - let max_ts = delta + pub fn commit_broadcast(&self, delta: &CrdtDelta) { + let cache_entries_by_shard: Vec<(usize, &HlcTimestamp)> = delta .cache_delta .as_ref() - .and_then(|d| d.entries.iter().map(|(_, e)| e.timestamp).max()) - .unwrap_or(self.last_broadcast_ts); - self.last_broadcast_ts = max_ts; - let committed_keys: std::collections::HashSet<&str> = delta + .map(|d| { + d.entries + .iter() + .map(|(key, entry)| (self.shard_for(key), &entry.timestamp)) + .collect() + }) + .unwrap_or_default(); + + let mut max_ts_per_shard: Vec> = (0..self.shards.len()) + .map(|_| None) + .collect(); + + cache_entries_by_shard.iter().for_each(|&(shard_idx, ts)| { + let slot = &mut max_ts_per_shard[shard_idx]; + *slot = Some(match slot { + Some(existing) if *existing >= *ts => *existing, + _ => *ts, + }); + }); + + let rl_index: std::collections::HashMap<&str, &g_counter::GCounter> = delta .rate_limit_deltas .iter() - .map(|d| d.key.as_str()) + .map(|d| (d.key.as_str(), &d.counter)) .collect(); - committed_keys.iter().for_each(|&key| { - let still_matches = self - .rate_limits - .peek_dirty_counter(key) - .zip(delta.rate_limit_deltas.iter().find(|d| d.key == key)) - .is_some_and(|(current, committed)| { - current.window_start_ms == committed.counter.window_start_ms - && current.total() == committed.counter.total() - }); - if still_matches { - self.rate_limits.clear_single_dirty(key); + + let mut shard_rl_keys: Vec> = (0..self.shards.len()) + .map(|_| Vec::new()) + .collect(); + rl_index.keys().for_each(|&key| { + shard_rl_keys[self.shard_for(key)].push(key); + }); + + self.shards.iter().enumerate().for_each(|(idx, shard_lock)| { + let has_cache_update = max_ts_per_shard[idx].is_some(); + let has_rl_keys = !shard_rl_keys[idx].is_empty(); + if !has_cache_update && !has_rl_keys { + return; } + let mut shard = shard_lock.write(); + if let Some(max_ts) = max_ts_per_shard[idx] { + shard.last_broadcast_ts = max_ts; + } + shard_rl_keys[idx].iter().for_each(|&key| { + let still_matches = shard + .rate_limits + .peek_dirty_counter(key) + .zip(rl_index.get(key)) + .is_some_and(|(current, committed)| { + current.window_start_ms == committed.window_start_ms + && current.total() == committed.total() + }); + if still_matches { + shard.rate_limits.clear_single_dirty(key); + } + }); }); } - pub fn merge_delta(&mut self, delta: &CrdtDelta) -> bool { + pub fn merge_delta(&self, delta: &CrdtDelta) -> bool { if !delta.is_compatible() { tracing::warn!( version = delta.version, @@ -109,43 +218,136 @@ impl CrdtStore { ); return false; } - let mut changed = false; + if let Some(ref cache_delta) = delta.cache_delta { + if let Some(max_ts) = cache_delta.entries.iter().map(|(_, e)| e.timestamp).max() { + let _ = self.hlc.lock().receive(max_ts); + } + } + + let mut changed = false; + + if let Some(ref cache_delta) = delta.cache_delta { + let mut entries_by_shard: Vec> = + (0..self.shards.len()).map(|_| Vec::new()).collect(); + cache_delta.entries.iter().for_each(|(key, entry)| { - let _ = self.hlc.receive(entry.timestamp); - if self.cache.merge_entry(key.clone(), entry.clone()) { - changed = true; + entries_by_shard[self.shard_for(key)].push((key.clone(), entry.clone())); + }); + + entries_by_shard.into_iter().enumerate().for_each(|(idx, entries)| { + if entries.is_empty() { + return; } + let mut shard = self.shards[idx].write(); + entries.into_iter().for_each(|(key, entry)| { + if shard.cache.merge_entry(key, entry) { + changed = true; + } + }); }); } - delta.rate_limit_deltas.iter().for_each(|rd| { - if self - .rate_limits - .merge_counter(rd.key.clone(), &rd.counter) - { - changed = true; - } - }); + + if !delta.rate_limit_deltas.is_empty() { + let mut rl_by_shard: Vec> = + (0..self.shards.len()).map(|_| Vec::new()).collect(); + + delta.rate_limit_deltas.iter().for_each(|rd| { + rl_by_shard[self.shard_for(&rd.key)].push((rd.key.clone(), &rd.counter)); + }); + + rl_by_shard.into_iter().enumerate().for_each(|(idx, entries)| { + if entries.is_empty() { + return; + } + let mut shard = self.shards[idx].write(); + entries.into_iter().for_each(|(key, counter)| { + if shard.rate_limits.merge_counter(key, counter) { + changed = true; + } + }); + }); + } + changed } - pub fn run_maintenance(&mut self) { + pub fn run_maintenance(&self) { let now = Self::wall_ms_now(); - self.cache.gc_tombstones(now); - self.cache.gc_expired(now); - self.rate_limits.gc_expired(now); + self.shards.iter().enumerate().for_each(|(idx, shard_lock)| { + let pending: Vec = self.promotions[idx].lock().drain(..).collect(); + let mut shard = shard_lock.write(); + pending.iter().for_each(|key| shard.cache.touch(key)); + shard.cache.gc_tombstones(now); + shard.cache.gc_expired(now); + shard.rate_limits.gc_expired(now); + }); + } + + pub fn peek_full_state(&self) -> CrdtDelta { + let mut cache_entries: Vec<(String, lww_map::LwwEntry)> = Vec::new(); + let mut rate_limit_deltas: Vec = Vec::new(); + + self.shards.iter().for_each(|shard_lock| { + let shard = shard_lock.read(); + let lww_delta = shard.cache.extract_delta_since(HlcTimestamp::ZERO); + cache_entries.extend(lww_delta.entries); + rate_limit_deltas.extend(shard.rate_limits.extract_all_deltas()); + }); + + let cache_delta = match cache_entries.is_empty() { + true => None, + false => Some(LwwDelta { entries: cache_entries }), + }; + + CrdtDelta { + version: 1, + source_node: self.node_id, + cache_delta, + rate_limit_deltas, + } } pub fn cache_estimated_bytes(&self) -> usize { - self.cache.estimated_bytes() + self.shards + .iter() + .map(|s| s.read().cache.estimated_bytes()) + .fold(0usize, usize::saturating_add) } pub fn rate_limit_estimated_bytes(&self) -> usize { - self.rate_limits.estimated_bytes() + self.shards + .iter() + .map(|s| s.read().rate_limits.estimated_bytes()) + .fold(0usize, usize::saturating_add) } - pub fn evict_lru(&mut self) -> Option { - self.cache.evict_lru() + pub fn total_estimated_bytes(&self) -> usize { + self.shards + .iter() + .map(|s| { + let shard = s.read(); + shard.cache.estimated_bytes().saturating_add(shard.rate_limits.estimated_bytes()) + }) + .fold(0usize, usize::saturating_add) + } + + pub fn evict_lru_round_robin(&self, start_shard: usize) -> Option<(usize, usize)> { + (0..self.shards.len()).find_map(|offset| { + let idx = (start_shard + offset) & self.shard_mask; + let has_entries = self.shards[idx].read().cache.len() > 0; + match has_entries { + true => { + let mut shard = self.shards[idx].write(); + let before = shard.cache.estimated_bytes(); + shard.cache.evict_lru().map(|_| { + let freed = before.saturating_sub(shard.cache.estimated_bytes()); + ((idx + 1) & self.shard_mask, freed) + }) + } + false => None, + } + }) } } @@ -155,15 +357,15 @@ mod tests { #[test] fn roundtrip_cache() { - let mut store = CrdtStore::new(1); + let store = ShardedCrdtStore::new(1); store.cache_set("key".into(), b"value".to_vec(), 60_000); assert_eq!(store.cache_get("key"), Some(b"value".to_vec())); } #[test] fn delta_merge_convergence() { - let mut store_a = CrdtStore::new(1); - let mut store_b = CrdtStore::new(2); + let store_a = ShardedCrdtStore::new(1); + let store_b = ShardedCrdtStore::new(2); store_a.cache_set("x".into(), b"from_a".to_vec(), 60_000); store_b.cache_set("y".into(), b"from_b".to_vec(), 60_000); @@ -184,8 +386,8 @@ mod tests { #[test] fn rate_limit_across_stores() { - let mut store_a = CrdtStore::new(1); - let mut store_b = CrdtStore::new(2); + let store_a = ShardedCrdtStore::new(1); + let store_b = ShardedCrdtStore::new(2); store_a.rate_limit_check("rl:test", 5, 60_000); store_a.rate_limit_check("rl:test", 5, 60_000); @@ -202,7 +404,7 @@ mod tests { #[test] fn incompatible_version_rejected() { - let mut store = CrdtStore::new(1); + let store = ShardedCrdtStore::new(1); let delta = CrdtDelta { version: 255, source_node: 99, diff --git a/crates/tranquil-ripple/src/engine.rs b/crates/tranquil-ripple/src/engine.rs index 3000182..32f3adb 100644 --- a/crates/tranquil-ripple/src/engine.rs +++ b/crates/tranquil-ripple/src/engine.rs @@ -1,11 +1,11 @@ use crate::cache::RippleCache; use crate::config::RippleConfig; -use crate::crdt::CrdtStore; +use crate::crdt::ShardedCrdtStore; use crate::eviction::MemoryBudget; use crate::gossip::{GossipEngine, PeerId}; +use crate::metrics; use crate::rate_limiter::RippleRateLimiter; use crate::transport::Transport; -use parking_lot::RwLock; use std::net::SocketAddr; use std::sync::Arc; use tokio_util::sync::CancellationToken; @@ -18,7 +18,7 @@ impl RippleEngine { config: RippleConfig, shutdown: CancellationToken, ) -> Result<(Arc, Arc, SocketAddr), RippleStartError> { - let store = Arc::new(RwLock::new(CrdtStore::new(config.machine_id))); + let store = Arc::new(ShardedCrdtStore::new(config.machine_id)); let (transport, incoming_rx) = Transport::bind(config.bind_addr, config.machine_id, shutdown.clone()) .await @@ -27,10 +27,18 @@ impl RippleEngine { let transport = Arc::new(transport); let bound_addr = transport.local_addr(); + let generation = u32::try_from( + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + % u64::from(u32::MAX), + ) + .unwrap_or(0); let local_id = PeerId { addr: bound_addr, machine_id: config.machine_id, - generation: 0, + generation, }; let gossip = GossipEngine::new(transport, store.clone(), local_id); @@ -51,7 +59,7 @@ impl RippleEngine { tokio::select! { _ = eviction_shutdown.cancelled() => break, _ = interval.tick() => { - budget.enforce(&mut store_for_eviction.write()); + budget.enforce(&store_for_eviction); } } } @@ -74,6 +82,8 @@ impl RippleEngine { let rate_limiter: Arc = Arc::new(RippleRateLimiter::new(store)); + metrics::describe_metrics(); + tracing::info!( bind = %bound_addr, machine_id = config.machine_id, diff --git a/crates/tranquil-ripple/src/eviction.rs b/crates/tranquil-ripple/src/eviction.rs index da15aee..9bbbdd8 100644 --- a/crates/tranquil-ripple/src/eviction.rs +++ b/crates/tranquil-ripple/src/eviction.rs @@ -1,45 +1,72 @@ -use crate::crdt::CrdtStore; +use crate::crdt::ShardedCrdtStore; +use crate::metrics; pub struct MemoryBudget { max_bytes: usize, + next_shard: std::sync::atomic::AtomicUsize, } impl MemoryBudget { pub fn new(max_bytes: usize) -> Self { - Self { max_bytes } + Self { + max_bytes, + next_shard: std::sync::atomic::AtomicUsize::new(0), + } } - pub fn enforce(&self, store: &mut CrdtStore) { + pub fn enforce(&self, store: &ShardedCrdtStore) { store.run_maintenance(); + let cache_bytes = store.cache_estimated_bytes(); + let rl_bytes = store.rate_limit_estimated_bytes(); + metrics::set_cache_bytes(cache_bytes); + metrics::set_rate_limit_bytes(rl_bytes); + let max_bytes = self.max_bytes; - let total_bytes = store.cache_estimated_bytes().saturating_add(store.rate_limit_estimated_bytes()); - let overshoot_ratio = match total_bytes > max_bytes && max_bytes > 0 { - true => total_bytes / max_bytes, + let total_bytes = cache_bytes.saturating_add(rl_bytes); + let overshoot_pct = match total_bytes > max_bytes && max_bytes > 0 { + true => total_bytes.saturating_sub(max_bytes).saturating_mul(100) / max_bytes, false => 0, }; const BASE_BATCH: usize = 256; - let batch_size = match overshoot_ratio { - 0..=1 => BASE_BATCH, - 2..=4 => BASE_BATCH * 4, + let batch_size = match overshoot_pct { + 0..=25 => BASE_BATCH, + 26..=100 => BASE_BATCH * 4, _ => BASE_BATCH * 8, }; - let evicted = std::iter::from_fn(|| { - let current = store.cache_estimated_bytes().saturating_add(store.rate_limit_estimated_bytes()); - match current > max_bytes { - true => store.evict_lru(), - false => None, + let mut remaining = total_bytes; + let mut next_shard: usize = self.next_shard.load(std::sync::atomic::Ordering::Relaxed); + let mut evicted: usize = 0; + (0..batch_size).try_for_each(|_| { + match remaining > max_bytes { + true => { + match store.evict_lru_round_robin(next_shard) { + Some((ns, freed)) => { + next_shard = ns; + remaining = remaining.saturating_sub(freed); + evicted += 1; + Ok(()) + } + None => Err(()), + } + } + false => Err(()), } }) - .take(batch_size) - .count(); + .ok(); + self.next_shard.store(next_shard, std::sync::atomic::Ordering::Relaxed); if evicted > 0 { + metrics::record_evictions(evicted); + let cache_bytes_after = store.cache_estimated_bytes(); + let rl_bytes_after = store.rate_limit_estimated_bytes(); + metrics::set_cache_bytes(cache_bytes_after); + metrics::set_rate_limit_bytes(rl_bytes_after); tracing::info!( evicted_entries = evicted, - cache_bytes = store.cache_estimated_bytes(), - rate_limit_bytes = store.rate_limit_estimated_bytes(), + cache_bytes = cache_bytes_after, + rate_limit_bytes = rl_bytes_after, max_bytes = self.max_bytes, "memory budget eviction" ); @@ -53,16 +80,16 @@ mod tests { #[test] fn eviction_under_budget() { - let mut store = CrdtStore::new(1); + let store = ShardedCrdtStore::new(1); let budget = MemoryBudget::new(1024 * 1024); store.cache_set("k".into(), vec![1, 2, 3], 60_000); - budget.enforce(&mut store); + budget.enforce(&store); assert!(store.cache_get("k").is_some()); } #[test] fn eviction_over_budget() { - let mut store = CrdtStore::new(1); + let store = ShardedCrdtStore::new(1); let budget = MemoryBudget::new(100); (0..50).for_each(|i| { store.cache_set( @@ -71,8 +98,7 @@ mod tests { 60_000, ); }); - budget.enforce(&mut store); - let total = store.cache_estimated_bytes().saturating_add(store.rate_limit_estimated_bytes()); - assert!(total <= 100); + budget.enforce(&store); + assert!(store.total_estimated_bytes() <= 100); } } diff --git a/crates/tranquil-ripple/src/gossip.rs b/crates/tranquil-ripple/src/gossip.rs index 118ac85..9e3bec4 100644 --- a/crates/tranquil-ripple/src/gossip.rs +++ b/crates/tranquil-ripple/src/gossip.rs @@ -1,8 +1,9 @@ use crate::crdt::delta::CrdtDelta; -use crate::crdt::CrdtStore; +use crate::crdt::lww_map::LwwDelta; +use crate::crdt::ShardedCrdtStore; +use crate::metrics; use crate::transport::{ChannelTag, IncomingFrame, Transport}; use foca::{Config, Foca, Notification, Runtime, Timer}; -use parking_lot::RwLock; use rand::rngs::StdRng; use rand::SeedableRng; use std::collections::HashSet; @@ -114,6 +115,10 @@ impl MemberTracker { fn active_peers(&self) -> impl Iterator + '_ { self.active_addrs.iter().copied() } + + fn peer_count(&self) -> usize { + self.active_addrs.len() + } } impl Runtime for &mut BufferedRuntime { @@ -141,14 +146,14 @@ impl Runtime for &mut BufferedRuntime { pub struct GossipEngine { transport: Arc, - store: Arc>, + store: Arc, local_id: PeerId, } impl GossipEngine { pub fn new( transport: Arc, - store: Arc>, + store: Arc, local_id: PeerId, ) -> Self { Self { @@ -203,11 +208,11 @@ impl GossipEngine { } }); - drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &shutdown); + drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &store, &shutdown); let mut gossip_tick = tokio::time::interval(Duration::from_millis(gossip_interval_ms)); - let mut maintenance_tick = tokio::time::interval(Duration::from_secs(10)); + gossip_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { tokio::select! { @@ -222,11 +227,13 @@ impl GossipEngine { if let Err(e) = foca.handle_data(&frame.data, &mut runtime) { tracing::warn!(error = %e, "foca handle_data error"); } - drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &shutdown); + drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &store, &shutdown); } ChannelTag::CrdtSync => { const MAX_DELTA_ENTRIES: usize = 10_000; const MAX_DELTA_RATE_LIMITS: usize = 10_000; + metrics::record_gossip_delta_received(); + metrics::record_gossip_delta_bytes(frame.data.len()); match bincode::serde::decode_from_slice::(&frame.data, bincode::config::standard()) { Ok((delta, _)) => { let cache_len = delta.cache_delta.as_ref().map_or(0, |d| d.entries.len()); @@ -235,6 +242,7 @@ impl GossipEngine { let window_mismatch = delta.rate_limit_deltas.iter().any(|rd| rd.counter.window_duration_ms == 0); match cache_len > MAX_DELTA_ENTRIES || rl_len > MAX_DELTA_RATE_LIMITS || gcounter_oversize || window_mismatch { true => { + metrics::record_gossip_drop(); tracing::warn!( cache_entries = cache_len, rate_limit_entries = rl_len, @@ -243,11 +251,14 @@ impl GossipEngine { ); } false => { - store.write().merge_delta(&delta); + if store.merge_delta(&delta) { + metrics::record_gossip_merge(); + } } } } Err(e) => { + metrics::record_gossip_drop(); tracing::warn!(error = %e, "failed to decode crdt sync delta"); } } @@ -256,72 +267,76 @@ impl GossipEngine { } } _ = gossip_tick.tick() => { - let pending = { - let s = store.read(); - let delta = s.peek_broadcast_delta(); - match delta.is_empty() { - true => None, - false => { - match bincode::serde::encode_to_vec(&delta, bincode::config::standard()) { - Ok(bytes) => Some((bytes, delta)), - Err(e) => { - tracing::warn!(error = %e, "failed to serialize broadcast delta"); - None - } - } - }, - } - }; - if let Some((ref data, ref delta)) = pending { - let peers: Vec = members.active_peers().collect(); - let mut all_queued = true; - let cancel = shutdown.clone(); - peers.iter().for_each(|&addr| { - match transport.try_queue(addr, ChannelTag::CrdtSync, data) { - true => {} - false => { - all_queued = false; - let t = transport.clone(); - let d = data.clone(); - let c = cancel.clone(); - tokio::spawn(async move { - tokio::select! { - _ = c.cancelled() => {} - _ = t.send(addr, ChannelTag::CrdtSync, &d) => {} - } - }); - } - } - }); - let stale = last_commit.elapsed() > Duration::from_secs(WATERMARK_STALE_SECS); - if all_queued || peers.is_empty() || stale { - if stale && !all_queued { - tracing::warn!( - elapsed_secs = last_commit.elapsed().as_secs(), - "force-advancing broadcast watermark (staleness cap)" - ); - } - store.write().commit_broadcast(delta); + let delta = store.peek_broadcast_delta(); + match delta.is_empty() { + true => { last_commit = tokio::time::Instant::now(); } - } + false => { + let chunks = chunk_and_serialize(&delta); + match chunks.is_empty() { + true => { + tracing::warn!("all delta chunks failed to serialize, force-committing watermark"); + store.commit_broadcast(&delta); + last_commit = tokio::time::Instant::now(); + } + false => { + let peers: Vec = members.active_peers().collect(); + let mut all_queued = true; + let cancel = shutdown.clone(); + chunks.iter().for_each(|chunk| { + metrics::record_gossip_delta_bytes(chunk.len()); + peers.iter().for_each(|&addr| { + metrics::record_gossip_delta_sent(); + match transport.try_queue(addr, ChannelTag::CrdtSync, chunk) { + true => {} + false => { + all_queued = false; + let t = transport.clone(); + let d = chunk.clone(); + let c = cancel.clone(); + tokio::spawn(async move { + tokio::select! { + _ = c.cancelled() => {} + _ = t.send(addr, ChannelTag::CrdtSync, &d) => {} + } + }); + } + } + }); + }); + let stale = last_commit.elapsed() > Duration::from_secs(WATERMARK_STALE_SECS); + if all_queued || peers.is_empty() || stale { + if stale && !all_queued { + tracing::warn!( + elapsed_secs = last_commit.elapsed().as_secs(), + "force-advancing broadcast watermark (staleness cap)" + ); + } + store.commit_broadcast(&delta); + last_commit = tokio::time::Instant::now(); + } + } + } + } + }; if let Err(e) = foca.gossip(&mut runtime) { tracing::warn!(error = %e, "foca gossip error"); } - drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &shutdown); + drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &store, &shutdown); } Some((timer, _)) = timer_rx.recv() => { if let Err(e) = foca.handle_timer(timer, &mut runtime) { tracing::warn!(error = %e, "foca handle_timer error"); } - drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &shutdown); + drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &store, &shutdown); } - _ = maintenance_tick.tick() => { - store.write().run_maintenance(); + _ = tokio::time::sleep(Duration::from_secs(10)) => { tracing::trace!( members = foca.num_members(), - cache_bytes = store.read().cache_estimated_bytes(), - "maintenance cycle" + cache_bytes = store.cache_estimated_bytes(), + rate_limit_bytes = store.rate_limit_estimated_bytes(), + "gossip health check" ); } } @@ -331,25 +346,21 @@ impl GossipEngine { } fn flush_final_delta( - store: &Arc>, + store: &Arc, transport: &Arc, members: &MemberTracker, ) { - let s = store.read(); - let delta = s.peek_broadcast_delta(); + let delta = store.peek_broadcast_delta(); if delta.is_empty() { return; } - match bincode::serde::encode_to_vec(&delta, bincode::config::standard()) { - Ok(bytes) => { - members.active_peers().for_each(|addr| { - let _ = transport.try_queue(addr, ChannelTag::CrdtSync, &bytes); - }); - } - Err(e) => { - tracing::warn!(error = %e, "failed to serialize final delta on shutdown"); - } - } + let chunks = chunk_and_serialize(&delta); + chunks.iter().for_each(|chunk| { + members.active_peers().for_each(|addr| { + let _ = transport.try_queue(addr, ChannelTag::CrdtSync, chunk); + }); + }); + store.commit_broadcast(&delta); } fn drain_runtime_actions( @@ -357,6 +368,7 @@ fn drain_runtime_actions( transport: &Arc, timer_tx: &mpsc::Sender<(Timer, Duration)>, members: &mut MemberTracker, + store: &Arc, shutdown: &CancellationToken, ) { let actions: Vec = runtime.actions.drain(..).collect(); @@ -373,18 +385,105 @@ fn drain_runtime_actions( } RuntimeAction::ScheduleTimer(timer, duration) => { let tx = timer_tx.clone(); + let c = shutdown.clone(); tokio::spawn(async move { - tokio::time::sleep(duration).await; - let _ = tx.send((timer, duration)).await; + tokio::select! { + _ = c.cancelled() => {} + _ = tokio::time::sleep(duration) => { + let _ = tx.send((timer, duration)).await; + } + } }); } RuntimeAction::MemberUp(addr) => { tracing::info!(peer = %addr, "member up"); members.member_up(addr); + metrics::set_gossip_peers(members.peer_count()); + let snapshot = store.peek_full_state(); + if !snapshot.is_empty() { + chunk_and_serialize(&snapshot).into_iter().for_each(|chunk| { + let t = transport.clone(); + let c = shutdown.clone(); + tokio::spawn(async move { + tokio::select! { + _ = c.cancelled() => {} + _ = t.send(addr, ChannelTag::CrdtSync, &chunk) => {} + } + }); + }); + } } RuntimeAction::MemberDown(addr) => { tracing::info!(peer = %addr, "member down"); members.member_down(addr); + metrics::set_gossip_peers(members.peer_count()); } }); } + +fn chunk_and_serialize(delta: &CrdtDelta) -> Vec> { + let config = bincode::config::standard(); + match bincode::serde::encode_to_vec(delta, config) { + Ok(bytes) if bytes.len() <= crate::transport::MAX_FRAME_SIZE => vec![bytes], + Ok(_) => split_and_serialize(delta.clone()), + Err(e) => { + tracing::warn!(error = %e, "failed to serialize delta"); + vec![] + } + } +} + +fn split_and_serialize(delta: CrdtDelta) -> Vec> { + let version = delta.version; + let source_node = delta.source_node; + let cache_entries = delta.cache_delta.map_or(Vec::new(), |d| d.entries); + let rl_deltas = delta.rate_limit_deltas; + + if cache_entries.is_empty() && rl_deltas.is_empty() { + return vec![]; + } + + if cache_entries.len() <= 1 && rl_deltas.len() <= 1 { + let mini = CrdtDelta { + version, + source_node, + cache_delta: match cache_entries.is_empty() { + true => None, + false => Some(LwwDelta { entries: cache_entries }), + }, + rate_limit_deltas: rl_deltas, + }; + match bincode::serde::encode_to_vec(&mini, bincode::config::standard()) { + Ok(bytes) if bytes.len() <= crate::transport::MAX_FRAME_SIZE => return vec![bytes], + _ => { + tracing::error!("irreducible delta entry exceeds max frame size, dropping"); + return vec![]; + } + } + } + + let mid_cache = cache_entries.len() / 2; + let mid_rl = rl_deltas.len() / 2; + + let mut left_cache = cache_entries; + let right_cache = left_cache.split_off(mid_cache); + let mut left_rl = rl_deltas; + let right_rl = left_rl.split_off(mid_rl); + + let make_sub = |entries: Vec<_>, rls| CrdtDelta { + version, + source_node, + cache_delta: match entries.is_empty() { + true => None, + false => Some(LwwDelta { entries }), + }, + rate_limit_deltas: rls, + }; + + let left = make_sub(left_cache, left_rl); + let right = make_sub(right_cache, right_rl); + + let mut result = chunk_and_serialize(&left); + result.extend(chunk_and_serialize(&right)); + result +} diff --git a/crates/tranquil-ripple/src/lib.rs b/crates/tranquil-ripple/src/lib.rs index ec49cd4..7cfecb8 100644 --- a/crates/tranquil-ripple/src/lib.rs +++ b/crates/tranquil-ripple/src/lib.rs @@ -4,6 +4,7 @@ pub mod crdt; pub mod engine; pub mod eviction; pub mod gossip; +pub mod metrics; pub mod rate_limiter; pub mod transport; diff --git a/crates/tranquil-ripple/src/metrics.rs b/crates/tranquil-ripple/src/metrics.rs new file mode 100644 index 0000000..c49e629 --- /dev/null +++ b/crates/tranquil-ripple/src/metrics.rs @@ -0,0 +1,108 @@ +use metrics::{counter, gauge, histogram}; + +pub fn describe_metrics() { + metrics::describe_gauge!( + "tranquil_ripple_cache_bytes", + "Estimated memory used by cache entries" + ); + metrics::describe_gauge!( + "tranquil_ripple_rate_limit_bytes", + "Estimated memory used by rate limit counters" + ); + metrics::describe_gauge!( + "tranquil_ripple_gossip_peers", + "Number of active gossip peers" + ); + metrics::describe_counter!( + "tranquil_ripple_cache_hits_total", + "Total cache read hits" + ); + metrics::describe_counter!( + "tranquil_ripple_cache_misses_total", + "Total cache read misses" + ); + metrics::describe_counter!( + "tranquil_ripple_cache_writes_total", + "Total cache write operations" + ); + metrics::describe_counter!( + "tranquil_ripple_cache_deletes_total", + "Total cache delete operations" + ); + metrics::describe_counter!( + "tranquil_ripple_evictions_total", + "Total cache entries evicted by memory budget" + ); + metrics::describe_counter!( + "tranquil_ripple_gossip_deltas_sent_total", + "Total CRDT delta chunks sent to peers" + ); + metrics::describe_counter!( + "tranquil_ripple_gossip_deltas_received_total", + "Total CRDT delta messages received from peers" + ); + metrics::describe_counter!( + "tranquil_ripple_gossip_merges_total", + "Total CRDT deltas merged with local state change" + ); + metrics::describe_counter!( + "tranquil_ripple_gossip_drops_total", + "Total CRDT deltas dropped (validation or decode failure)" + ); + metrics::describe_histogram!( + "tranquil_ripple_gossip_delta_bytes", + "Size of CRDT delta chunks in bytes" + ); +} + +pub fn record_cache_hit() { + counter!("tranquil_ripple_cache_hits_total").increment(1); +} + +pub fn record_cache_miss() { + counter!("tranquil_ripple_cache_misses_total").increment(1); +} + +pub fn record_cache_write() { + counter!("tranquil_ripple_cache_writes_total").increment(1); +} + +pub fn record_cache_delete() { + counter!("tranquil_ripple_cache_deletes_total").increment(1); +} + +pub fn set_cache_bytes(bytes: usize) { + gauge!("tranquil_ripple_cache_bytes").set(bytes as f64); +} + +pub fn set_rate_limit_bytes(bytes: usize) { + gauge!("tranquil_ripple_rate_limit_bytes").set(bytes as f64); +} + +pub fn set_gossip_peers(count: usize) { + gauge!("tranquil_ripple_gossip_peers").set(count as f64); +} + +pub fn record_evictions(count: usize) { + counter!("tranquil_ripple_evictions_total").increment(count as u64); +} + +pub fn record_gossip_delta_sent() { + counter!("tranquil_ripple_gossip_deltas_sent_total").increment(1); +} + +pub fn record_gossip_delta_received() { + counter!("tranquil_ripple_gossip_deltas_received_total").increment(1); +} + +pub fn record_gossip_merge() { + counter!("tranquil_ripple_gossip_merges_total").increment(1); +} + +pub fn record_gossip_drop() { + counter!("tranquil_ripple_gossip_drops_total").increment(1); +} + +pub fn record_gossip_delta_bytes(bytes: usize) { + histogram!("tranquil_ripple_gossip_delta_bytes").record(bytes as f64); +} diff --git a/crates/tranquil-ripple/src/rate_limiter.rs b/crates/tranquil-ripple/src/rate_limiter.rs index 7250497..93e84ca 100644 --- a/crates/tranquil-ripple/src/rate_limiter.rs +++ b/crates/tranquil-ripple/src/rate_limiter.rs @@ -1,15 +1,14 @@ -use crate::crdt::CrdtStore; +use crate::crdt::ShardedCrdtStore; use async_trait::async_trait; -use parking_lot::RwLock; use std::sync::Arc; use tranquil_infra::DistributedRateLimiter; pub struct RippleRateLimiter { - store: Arc>, + store: Arc, } impl RippleRateLimiter { - pub fn new(store: Arc>) -> Self { + pub fn new(store: Arc) -> Self { Self { store } } } @@ -17,11 +16,11 @@ impl RippleRateLimiter { #[async_trait] impl DistributedRateLimiter for RippleRateLimiter { async fn check_rate_limit(&self, key: &str, limit: u32, window_ms: u64) -> bool { - self.store.write().rate_limit_check(key, limit, window_ms) + self.store.rate_limit_check(key, limit, window_ms) } async fn peek_rate_limit_count(&self, key: &str, window_ms: u64) -> u64 { - self.store.read().rate_limit_peek(key, window_ms) + self.store.rate_limit_peek(key, window_ms) } } @@ -31,7 +30,7 @@ mod tests { #[tokio::test] async fn rate_limiter_trait_allows_within_limit() { - let store = Arc::new(RwLock::new(CrdtStore::new(1))); + let store = Arc::new(ShardedCrdtStore::new(1)); let rl = RippleRateLimiter::new(store); assert!(rl.check_rate_limit("test", 5, 60_000).await); assert!(rl.check_rate_limit("test", 5, 60_000).await); @@ -39,7 +38,7 @@ mod tests { #[tokio::test] async fn rate_limiter_trait_blocks_over_limit() { - let store = Arc::new(RwLock::new(CrdtStore::new(1))); + let store = Arc::new(ShardedCrdtStore::new(1)); let rl = RippleRateLimiter::new(store); assert!(rl.check_rate_limit("k", 3, 60_000).await); assert!(rl.check_rate_limit("k", 3, 60_000).await); diff --git a/crates/tranquil-ripple/src/transport.rs b/crates/tranquil-ripple/src/transport.rs index 93cd35e..97a6a38 100644 --- a/crates/tranquil-ripple/src/transport.rs +++ b/crates/tranquil-ripple/src/transport.rs @@ -3,15 +3,16 @@ use bytes::{Buf, BufMut, BytesMut}; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; -const MAX_FRAME_SIZE: usize = 16 * 1024 * 1024; +pub(crate) const MAX_FRAME_SIZE: usize = 16 * 1024 * 1024; const MAX_INBOUND_CONNECTIONS: usize = 512; +const MAX_OUTBOUND_CONNECTIONS: usize = 512; const WRITE_TIMEOUT: Duration = Duration::from_secs(10); #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -44,6 +45,7 @@ pub struct IncomingFrame { struct ConnectionWriter { tx: mpsc::Sender>, + generation: u64, } pub struct Transport { @@ -51,8 +53,10 @@ pub struct Transport { _machine_id: u64, connections: Arc>>, connecting: Arc>>, + conn_generation: Arc, #[allow(dead_code)] inbound_count: Arc, + outbound_count: Arc, shutdown: CancellationToken, incoming_tx: mpsc::Sender, } @@ -73,7 +77,9 @@ impl Transport { _machine_id: machine_id, connections: Arc::new(parking_lot::Mutex::new(HashMap::new())), connecting: Arc::new(parking_lot::Mutex::new(std::collections::HashSet::new())), + conn_generation: Arc::new(AtomicU64::new(0)), inbound_count: inbound_count.clone(), + outbound_count: Arc::new(AtomicUsize::new(0)), shutdown: shutdown.clone(), incoming_tx: incoming_tx.clone(), }; @@ -99,6 +105,7 @@ impl Transport { continue; } inbound_counter.fetch_add(1, Ordering::Relaxed); + configure_socket(&stream); Self::spawn_reader( stream, peer_addr, @@ -144,12 +151,20 @@ impl Transport { }; let writer = { let conns = self.connections.lock(); - conns.get(&target).map(|w| w.tx.clone()) + conns.get(&target).map(|w| (w.tx.clone(), w.generation)) }; match writer { - Some(tx) => { + Some((tx, acquired_gen)) => { if tx.send(frame).await.is_err() { - self.connections.lock().remove(&target); + { + let mut conns = self.connections.lock(); + let stale = conns + .get(&target) + .is_some_and(|w| w.generation == acquired_gen); + if stale { + conns.remove(&target); + } + } self.connect_and_send(target, tag, data).await; } } @@ -163,7 +178,7 @@ impl Transport { { let mut connecting = self.connecting.lock(); if connecting.contains(&target) { - tracing::debug!(peer = %target, "connection already in-flight, dropping frame"); + tracing::warn!(peer = %target, "connection already in-flight, dropping frame"); return; } connecting.insert(target); @@ -191,17 +206,39 @@ impl Transport { .await; match stream { Ok(stream) => { + if self.outbound_count.load(Ordering::Relaxed) >= MAX_OUTBOUND_CONNECTIONS { + tracing::warn!( + peer = %target, + max = MAX_OUTBOUND_CONNECTIONS, + "outbound connection limit reached, dropping" + ); + return; + } + self.outbound_count.fetch_add(1, Ordering::Relaxed); + configure_socket(&stream); let (read_half, write_half) = stream.into_split(); let (write_tx, mut write_rx) = mpsc::channel::>(1024); - let cancel = self.shutdown.clone(); + + let conn_gen = self.conn_generation.fetch_add(1, Ordering::Relaxed); + self.connections.lock().insert( + target, + ConnectionWriter { tx: write_tx.clone(), generation: conn_gen }, + ); + if let Some(frame) = encode_frame(tag, data) { + let _ = write_tx.try_send(frame); + } + + let conn_cancel = self.shutdown.child_token(); + let reader_cancel = conn_cancel.clone(); let connections = self.connections.clone(); + let outbound_counter = self.outbound_count.clone(); let peer = target; tokio::spawn(async move { let mut writer = write_half; loop { tokio::select! { - _ = cancel.cancelled() => break, + _ = conn_cancel.cancelled() => break, msg = write_rx.recv() => { match msg { Some(buf) => { @@ -227,19 +264,11 @@ impl Transport { } } connections.lock().remove(&peer); + outbound_counter.fetch_sub(1, Ordering::Relaxed); + conn_cancel.cancel(); }); - Self::spawn_reader_half(read_half, target, self.incoming_tx.clone(), self.shutdown.clone()); - - let frame = match encode_frame(tag, data) { - Some(f) => f, - None => return, - }; - let _ = write_tx.send(frame).await; - self.connections.lock().insert( - target, - ConnectionWriter { tx: write_tx }, - ); + Self::spawn_reader_half(read_half, target, self.incoming_tx.clone(), reader_cancel); tracing::debug!(peer = %target, "established outbound connection"); } Err(e) => { @@ -309,6 +338,7 @@ impl Transport { } } } + cancel.cancel(); }); } @@ -335,6 +365,19 @@ impl Transport { } } +fn configure_socket(stream: &TcpStream) { + let sock_ref = socket2::SockRef::from(stream); + if let Err(e) = sock_ref.set_tcp_nodelay(true) { + tracing::warn!(error = %e, "failed to set TCP_NODELAY"); + } + let params = socket2::TcpKeepalive::new() + .with_time(Duration::from_secs(30)) + .with_interval(Duration::from_secs(10)); + if let Err(e) = sock_ref.set_tcp_keepalive(¶ms) { + tracing::warn!(error = %e, "failed to set TCP keepalive"); + } +} + fn encode_frame(tag: ChannelTag, data: &[u8]) -> Option> { match data.len() > MAX_FRAME_SIZE { true => { diff --git a/crates/tranquil-ripple/tests/two_node_convergence.rs b/crates/tranquil-ripple/tests/two_node_convergence.rs index b275aa6..72e7d25 100644 --- a/crates/tranquil-ripple/tests/two_node_convergence.rs +++ b/crates/tranquil-ripple/tests/two_node_convergence.rs @@ -608,3 +608,163 @@ async fn two_node_rate_limit_split_increment() { shutdown.cancel(); } + +#[tokio::test] +async fn two_node_partition_recovery() { + tracing_subscriber::fmt() + .with_max_level(tracing_subscriber::filter::LevelFilter::DEBUG) + .with_test_writer() + .try_init() + .ok(); + + let shutdown = CancellationToken::new(); + + let config_a = RippleConfig { + bind_addr: "127.0.0.1:0".parse().unwrap(), + seed_peers: vec![], + machine_id: 100, + gossip_interval_ms: 100, + cache_max_bytes: 64 * 1024 * 1024, + }; + let (cache_a, _rl_a, addr_a) = RippleEngine::start(config_a, shutdown.clone()) + .await + .expect("node A failed to start"); + + futures::future::join_all((0..50).map(|i| { + let cache = cache_a.clone(); + async move { + cache + .set( + &format!("pre-{i}"), + &format!("val-{i}"), + Duration::from_secs(300), + ) + .await + .expect("set on A failed"); + } + })) + .await; + + let config_b = RippleConfig { + bind_addr: "127.0.0.1:0".parse().unwrap(), + seed_peers: vec![addr_a], + machine_id: 200, + gossip_interval_ms: 100, + cache_max_bytes: 64 * 1024 * 1024, + }; + let (cache_b, _rl_b, _addr_b) = RippleEngine::start(config_b, shutdown.clone()) + .await + .expect("node B failed to start"); + + let b = cache_b.clone(); + poll_until(15_000, 200, move || { + let b = b.clone(); + async move { + futures::future::join_all((0..50).map(|i| { + let b = b.clone(); + async move { b.get(&format!("pre-{i}")).await.is_some() } + })) + .await + .into_iter() + .all(|present| present) + } + }) + .await; + + futures::future::join_all((0..50).map(|i| { + let cache = cache_b.clone(); + async move { + cache + .set( + &format!("post-{i}"), + &format!("bval-{i}"), + Duration::from_secs(300), + ) + .await + .expect("set on B failed"); + } + })) + .await; + + let a = cache_a.clone(); + poll_until(15_000, 200, move || { + let a = a.clone(); + async move { + futures::future::join_all((0..50).map(|i| { + let a = a.clone(); + async move { a.get(&format!("post-{i}")).await.is_some() } + })) + .await + .into_iter() + .all(|present| present) + } + }) + .await; + + shutdown.cancel(); +} + +#[tokio::test] +async fn two_node_stress_concurrent_load() { + tracing_subscriber::fmt() + .with_max_level(tracing_subscriber::filter::LevelFilter::INFO) + .with_test_writer() + .try_init() + .ok(); + + let shutdown = CancellationToken::new(); + let ((cache_a, rl_a), (cache_b, rl_b)) = spawn_pair(shutdown.clone()).await; + + let tasks: Vec> = (0u32..8).map(|task_id| { + let cache = match task_id < 4 { + true => cache_a.clone(), + false => cache_b.clone(), + }; + let rl = match task_id < 4 { + true => rl_a.clone(), + false => rl_b.clone(), + }; + tokio::spawn(async move { + let value = vec![0xABu8; 1024]; + futures::future::join_all((0u32..500).map(|op| { + let cache = cache.clone(); + let rl = rl.clone(); + let value = value.clone(); + async move { + let key_idx = op % 100; + let key = format!("stress-{task_id}-{key_idx}"); + match op % 4 { + 0 | 1 => { + cache + .set_bytes(&key, &value, Duration::from_secs(120)) + .await + .expect("set_bytes failed"); + } + 2 => { + let _ = cache.get(&key).await; + } + _ => { + let _ = rl.check_rate_limit(&key, 1000, 60_000).await; + } + } + } + })) + .await; + }) + }).collect(); + + let results = tokio::time::timeout( + Duration::from_secs(30), + futures::future::join_all(tasks), + ) + .await + .expect("stress test timed out after 30s"); + + results.into_iter().enumerate().for_each(|(i, r)| { + r.unwrap_or_else(|e| panic!("task {i} panicked: {e}")); + }); + + tokio::time::sleep(Duration::from_secs(12)).await; + + shutdown.cancel(); +}