feat: cache locks less

This commit is contained in:
lewis
2026-02-08 12:02:29 +02:00
parent 1938eefebe
commit 07c8bf5b7f
17 changed files with 1005 additions and 283 deletions

View File

@@ -29,6 +29,10 @@ test-group = "serial-env-tests"
filter = "binary(whole_story)"
test-group = "heavy-load-tests"
[[profile.default.overrides]]
filter = "test(/two_node_stress_concurrent_load/)"
test-group = "heavy-load-tests"
[[profile.ci.overrides]]
filter = "test(/import_with_verification/) | test(/plc_migration/)"
test-group = "serial-env-tests"
@@ -40,3 +44,7 @@ test-group = "serial-env-tests"
[[profile.ci.overrides]]
filter = "binary(whole_story)"
test-group = "heavy-load-tests"
[[profile.ci.overrides]]
filter = "test(/two_node_stress_concurrent_load/)"
test-group = "heavy-load-tests"

46
Cargo.lock generated
View File

@@ -2778,7 +2778,7 @@ dependencies = [
"libc",
"percent-encoding",
"pin-project-lite",
"socket2 0.6.1",
"socket2 0.5.10",
"system-configuration",
"tokio",
"tower-service",
@@ -4242,7 +4242,7 @@ dependencies = [
"quinn-udp",
"rustc-hash",
"rustls 0.23.35",
"socket2 0.6.1",
"socket2 0.5.10",
"thiserror 2.0.17",
"tokio",
"tracing",
@@ -4279,7 +4279,7 @@ dependencies = [
"cfg_aliases",
"libc",
"once_cell",
"socket2 0.6.1",
"socket2 0.5.10",
"tracing",
"windows-sys 0.60.2",
]
@@ -4402,7 +4402,7 @@ dependencies = [
"pin-project-lite",
"ryu",
"sha1_smol",
"socket2 0.6.1",
"socket2 0.6.2",
"tokio",
"tokio-util",
"url",
@@ -5134,9 +5134,9 @@ dependencies = [
[[package]]
name = "socket2"
version = "0.6.1"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881"
checksum = "86f4aa3ad99f2088c990dfa82d367e19cb29268ed67c574d10d0a4bfe71f07e0"
dependencies = [
"libc",
"windows-sys 0.60.2",
@@ -5693,7 +5693,7 @@ dependencies = [
"mio",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.6.1",
"socket2 0.6.2",
"tokio-macros",
"windows-sys 0.61.2",
]
@@ -5797,7 +5797,7 @@ dependencies = [
"hyper-util",
"percent-encoding",
"pin-project",
"socket2 0.6.1",
"socket2 0.6.2",
"sync_wrapper",
"tokio",
"tokio-stream",
@@ -5963,7 +5963,7 @@ dependencies = [
[[package]]
name = "tranquil-auth"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"anyhow",
"base32",
@@ -5985,7 +5985,7 @@ dependencies = [
[[package]]
name = "tranquil-cache"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -5998,7 +5998,7 @@ dependencies = [
[[package]]
name = "tranquil-comms"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -6012,7 +6012,7 @@ dependencies = [
[[package]]
name = "tranquil-crypto"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"aes-gcm",
"base64 0.22.1",
@@ -6028,7 +6028,7 @@ dependencies = [
[[package]]
name = "tranquil-db"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"async-trait",
"chrono",
@@ -6045,7 +6045,7 @@ dependencies = [
[[package]]
name = "tranquil-db-traits"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -6061,7 +6061,7 @@ dependencies = [
[[package]]
name = "tranquil-infra"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"async-trait",
"bytes",
@@ -6071,7 +6071,7 @@ dependencies = [
[[package]]
name = "tranquil-oauth"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"anyhow",
"axum",
@@ -6094,7 +6094,7 @@ dependencies = [
[[package]]
name = "tranquil-pds"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"aes-gcm",
"anyhow",
@@ -6179,7 +6179,7 @@ dependencies = [
[[package]]
name = "tranquil-repo"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"bytes",
"cid",
@@ -6191,7 +6191,7 @@ dependencies = [
[[package]]
name = "tranquil-ripple"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"async-trait",
"backon",
@@ -6199,9 +6199,11 @@ dependencies = [
"bytes",
"foca",
"futures",
"metrics",
"parking_lot",
"rand 0.9.2",
"serde",
"socket2 0.6.2",
"thiserror 2.0.17",
"tokio",
"tokio-util",
@@ -6213,7 +6215,7 @@ dependencies = [
[[package]]
name = "tranquil-scopes"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"axum",
"futures",
@@ -6228,7 +6230,7 @@ dependencies = [
[[package]]
name = "tranquil-storage"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"async-trait",
"aws-config",
@@ -6244,7 +6246,7 @@ dependencies = [
[[package]]
name = "tranquil-types"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"chrono",
"cid",

View File

@@ -12,9 +12,11 @@ backon = { workspace = true }
bincode = { workspace = true }
bytes = { workspace = true }
foca = { workspace = true }
metrics = { workspace = true }
parking_lot = { workspace = true }
rand = "0.9"
serde = { workspace = true }
socket2 = "0.6.2"
thiserror = { workspace = true }
tokio = { workspace = true, features = ["net", "io-util", "sync", "time"] }
tokio-util = { workspace = true }

View File

@@ -1,16 +1,16 @@
use crate::crdt::CrdtStore;
use crate::crdt::ShardedCrdtStore;
use crate::metrics;
use async_trait::async_trait;
use parking_lot::RwLock;
use std::sync::Arc;
use std::time::Duration;
use tranquil_infra::{Cache, CacheError};
pub struct RippleCache {
store: Arc<RwLock<CrdtStore>>,
store: Arc<ShardedCrdtStore>,
}
impl RippleCache {
pub fn new(store: Arc<RwLock<CrdtStore>>) -> Self {
pub fn new(store: Arc<ShardedCrdtStore>) -> Self {
Self { store }
}
}
@@ -18,32 +18,45 @@ impl RippleCache {
#[async_trait]
impl Cache for RippleCache {
async fn get(&self, key: &str) -> Option<String> {
self.store
.read()
let result = self
.store
.cache_get(key)
.and_then(|bytes| String::from_utf8(bytes).ok())
.and_then(|bytes| String::from_utf8(bytes).ok());
match result.is_some() {
true => metrics::record_cache_hit(),
false => metrics::record_cache_miss(),
}
result
}
async fn set(&self, key: &str, value: &str, ttl: Duration) -> Result<(), CacheError> {
let ttl_ms = u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX);
self.store
.write()
.cache_set(key.to_string(), value.as_bytes().to_vec(), ttl.as_millis() as u64);
.cache_set(key.to_string(), value.as_bytes().to_vec(), ttl_ms);
metrics::record_cache_write();
Ok(())
}
async fn delete(&self, key: &str) -> Result<(), CacheError> {
self.store.write().cache_delete(key);
self.store.cache_delete(key);
metrics::record_cache_delete();
Ok(())
}
async fn get_bytes(&self, key: &str) -> Option<Vec<u8>> {
self.store.read().cache_get(key)
let result = self.store.cache_get(key);
match result.is_some() {
true => metrics::record_cache_hit(),
false => metrics::record_cache_miss(),
}
result
}
async fn set_bytes(&self, key: &str, value: &[u8], ttl: Duration) -> Result<(), CacheError> {
let ttl_ms = u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX);
self.store
.write()
.cache_set(key.to_string(), value.to_vec(), ttl.as_millis() as u64);
.cache_set(key.to_string(), value.to_vec(), ttl_ms);
metrics::record_cache_write();
Ok(())
}
}
@@ -54,7 +67,7 @@ mod tests {
#[tokio::test]
async fn cache_trait_roundtrip() {
let store = Arc::new(RwLock::new(CrdtStore::new(1)));
let store = Arc::new(ShardedCrdtStore::new(1));
let cache = RippleCache::new(store);
cache
.set("test", "value", Duration::from_secs(60))
@@ -65,7 +78,7 @@ mod tests {
#[tokio::test]
async fn cache_trait_bytes() {
let store = Arc::new(RwLock::new(CrdtStore::new(1)));
let store = Arc::new(ShardedCrdtStore::new(1));
let cache = RippleCache::new(store);
let data = vec![0xDE, 0xAD, 0xBE, 0xEF];
cache
@@ -77,7 +90,7 @@ mod tests {
#[tokio::test]
async fn cache_trait_delete() {
let store = Arc::new(RwLock::new(CrdtStore::new(1)));
let store = Arc::new(ShardedCrdtStore::new(1));
let cache = RippleCache::new(store);
cache
.set("del", "x", Duration::from_secs(60))

View File

@@ -1,6 +1,6 @@
use std::net::SocketAddr;
fn fnv1a(data: &[u8]) -> u64 {
pub(crate) fn fnv1a(data: &[u8]) -> u64 {
data.iter().fold(0xcbf29ce484222325u64, |hash, &byte| {
(hash ^ byte as u64).wrapping_mul(0x100000001b3)
})

View File

@@ -163,6 +163,9 @@ impl RateLimitStore {
}
pub fn peek_count(&self, key: &str, window_ms: u64, now_wall_ms: u64) -> u64 {
if window_ms == 0 {
return 0;
}
match self.counters.get(key) {
Some(counter) if counter.window_start_ms == Self::aligned_window_start(now_wall_ms, window_ms) => {
counter.total()
@@ -195,6 +198,16 @@ impl RateLimitStore {
.fold(0usize, usize::saturating_add)
}
pub fn extract_all_deltas(&self) -> Vec<GCounterDelta> {
self.counters
.iter()
.map(|(key, counter)| GCounterDelta {
key: key.clone(),
counter: counter.clone(),
})
.collect()
}
pub fn gc_expired(&mut self, now_wall_ms: u64) {
let expired: Vec<String> = self
.counters

View File

@@ -54,10 +54,13 @@ impl Hlc {
}
fn physical_now() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
u64::try_from(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis(),
)
.unwrap_or(u64::MAX)
}
pub fn now(&mut self) -> HlcTimestamp {

View File

@@ -1,5 +1,4 @@
use super::hlc::HlcTimestamp;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap};
@@ -27,11 +26,37 @@ impl LwwEntry {
}
fn entry_byte_size(&self, key: &str) -> usize {
const OVERHEAD: usize = 128;
key.len()
+ self.value.as_ref().map_or(0, Vec::len)
+ std::mem::size_of::<Self>()
+ OVERHEAD
const HASHMAP_ENTRY_OVERHEAD: usize = 64;
const BTREE_NODE_OVERHEAD: usize = 64;
const STRING_HEADER: usize = 24;
const COUNTER_SIZE: usize = 8;
let key_len = key.len();
let value_len = self.value.as_ref().map_or(0, Vec::len);
let main_entry = key_len
.saturating_add(value_len)
.saturating_add(std::mem::size_of::<Self>())
.saturating_add(HASHMAP_ENTRY_OVERHEAD);
match self.is_tombstone() {
true => main_entry,
false => {
let lru_btree = COUNTER_SIZE
.saturating_add(STRING_HEADER)
.saturating_add(key_len)
.saturating_add(BTREE_NODE_OVERHEAD);
let lru_hashmap = STRING_HEADER
.saturating_add(key_len)
.saturating_add(COUNTER_SIZE)
.saturating_add(HASHMAP_ENTRY_OVERHEAD);
main_entry
.saturating_add(lru_btree)
.saturating_add(lru_hashmap)
}
}
}
}
@@ -59,11 +84,26 @@ impl LruTracker {
if let Some(old_counter) = self.key_to_counter.remove(key) {
self.counter_to_key.remove(&old_counter);
}
if self.counter >= u64::MAX - 1 {
self.compact();
}
self.counter = self.counter.saturating_add(1);
self.counter_to_key.insert(self.counter, key.to_string());
self.key_to_counter.insert(key.to_string(), self.counter);
}
fn compact(&mut self) {
let keys: Vec<String> = self.counter_to_key.values().cloned().collect();
self.counter_to_key.clear();
self.key_to_counter.clear();
keys.into_iter().enumerate().for_each(|(i, key)| {
let new_counter = (i as u64).saturating_add(1);
self.counter_to_key.insert(new_counter, key.clone());
self.key_to_counter.insert(key, new_counter);
});
self.counter = self.counter_to_key.len() as u64;
}
fn remove(&mut self, key: &str) {
if let Some(counter) = self.key_to_counter.remove(key) {
self.counter_to_key.remove(&counter);
@@ -80,7 +120,7 @@ impl LruTracker {
pub struct LwwMap {
entries: HashMap<String, LwwEntry>,
lru: Mutex<LruTracker>,
lru: LruTracker,
estimated_bytes: usize,
}
@@ -88,7 +128,7 @@ impl LwwMap {
pub fn new() -> Self {
Self {
entries: HashMap::new(),
lru: Mutex::new(LruTracker::new()),
lru: LruTracker::new(),
estimated_bytes: 0,
}
}
@@ -98,9 +138,7 @@ impl LwwMap {
if entry.is_expired(now_wall_ms) || entry.is_tombstone() {
return None;
}
let value = entry.value.clone();
self.lru.lock().promote(key);
value
entry.value.clone()
}
pub fn set(&mut self, key: String, value: Vec<u8>, timestamp: HlcTimestamp, ttl_ms: u64, wall_ms_now: u64) {
@@ -113,18 +151,15 @@ impl LwwMap {
self.remove_estimated_bytes(&key);
self.estimated_bytes += entry.entry_byte_size(&key);
self.entries.insert(key.clone(), entry);
self.lru.lock().promote(&key);
self.lru.promote(&key);
}
pub fn delete(&mut self, key: &str, timestamp: HlcTimestamp, wall_ms_now: u64) {
match self.entries.get(key) {
let ttl_ms = match self.entries.get(key) {
Some(existing) if existing.timestamp >= timestamp => return,
_ => {}
}
let ttl_ms = self
.entries
.get(key)
.map_or(60_000, |e| e.ttl_ms.max(60_000));
Some(existing) => existing.ttl_ms.max(60_000),
None => 60_000,
};
let entry = LwwEntry {
value: None,
timestamp,
@@ -134,7 +169,7 @@ impl LwwMap {
self.remove_estimated_bytes(key);
self.estimated_bytes += entry.entry_byte_size(key);
self.entries.insert(key.to_string(), entry);
self.lru.lock().remove(key);
self.lru.remove(key);
}
pub fn merge_entry(&mut self, key: String, remote: LwwEntry) -> bool {
@@ -145,10 +180,9 @@ impl LwwMap {
self.remove_estimated_bytes(&key);
self.estimated_bytes += remote.entry_byte_size(&key);
self.entries.insert(key.clone(), remote);
let mut lru = self.lru.lock();
match is_tombstone {
true => lru.remove(&key),
false => lru.promote(&key),
true => self.lru.remove(&key),
false => self.lru.promote(&key),
}
true
}
@@ -175,10 +209,7 @@ impl LwwMap {
expired_keys.iter().for_each(|key| {
self.remove_estimated_bytes(key);
self.entries.remove(key);
});
let mut lru = self.lru.lock();
expired_keys.iter().for_each(|key| {
lru.remove(key);
self.lru.remove(key);
});
}
@@ -192,20 +223,23 @@ impl LwwMap {
expired_keys.iter().for_each(|key| {
self.remove_estimated_bytes(key);
self.entries.remove(key);
});
let mut lru = self.lru.lock();
expired_keys.iter().for_each(|key| {
lru.remove(key);
self.lru.remove(key);
});
}
pub fn evict_lru(&mut self) -> Option<String> {
let key = self.lru.lock().pop_least_recent()?;
let key = self.lru.pop_least_recent()?;
self.remove_estimated_bytes(&key);
self.entries.remove(&key);
Some(key)
}
pub fn touch(&mut self, key: &str) {
if self.entries.contains_key(key) {
self.lru.promote(key);
}
}
pub fn estimated_bytes(&self) -> usize {
self.estimated_bytes
}
@@ -359,14 +393,13 @@ mod tests {
}
#[test]
fn lru_eviction() {
fn lru_eviction_by_write_order() {
let mut map = LwwMap::new();
map.set("k1".into(), b"a".to_vec(), ts(100, 0, 1), 60_000, 100);
map.set("k2".into(), b"b".to_vec(), ts(101, 0, 1), 60_000, 101);
map.set("k3".into(), b"c".to_vec(), ts(102, 0, 1), 60_000, 102);
let _ = map.get("k1", 102);
let evicted = map.evict_lru();
assert_eq!(evicted.as_deref(), Some("k2"));
assert_eq!(evicted.as_deref(), Some("k1"));
}
#[test]

View File

@@ -3,105 +3,214 @@ pub mod hlc;
pub mod lww_map;
pub mod g_counter;
use crate::config::fnv1a;
use delta::CrdtDelta;
use hlc::{Hlc, HlcTimestamp};
use lww_map::LwwMap;
use lww_map::{LwwDelta, LwwMap};
use g_counter::RateLimitStore;
use parking_lot::{Mutex, RwLock};
use std::time::{SystemTime, UNIX_EPOCH};
pub struct CrdtStore {
hlc: Hlc,
const SHARD_COUNT: usize = 64;
const MAX_PROMOTIONS_PER_SHARD: usize = 8192;
const MAX_REPLICABLE_VALUE_SIZE: usize = 15 * 1024 * 1024;
struct CrdtShard {
cache: LwwMap,
rate_limits: RateLimitStore,
last_broadcast_ts: HlcTimestamp,
}
impl CrdtStore {
pub fn new(node_id: u64) -> Self {
impl CrdtShard {
fn new(node_id: u64) -> Self {
Self {
hlc: Hlc::new(node_id),
cache: LwwMap::new(),
rate_limits: RateLimitStore::new(node_id),
last_broadcast_ts: HlcTimestamp::ZERO,
}
}
}
pub struct ShardedCrdtStore {
hlc: Mutex<Hlc>,
shards: Box<[RwLock<CrdtShard>]>,
promotions: Box<[Mutex<Vec<String>>]>,
shard_mask: usize,
node_id: u64,
}
impl ShardedCrdtStore {
pub fn new(node_id: u64) -> Self {
const { assert!(SHARD_COUNT.is_power_of_two()) };
let shards: Vec<RwLock<CrdtShard>> = (0..SHARD_COUNT)
.map(|_| RwLock::new(CrdtShard::new(node_id)))
.collect();
let promotions: Vec<Mutex<Vec<String>>> = (0..SHARD_COUNT)
.map(|_| Mutex::new(Vec::new()))
.collect();
Self {
hlc: Mutex::new(Hlc::new(node_id)),
shards: shards.into_boxed_slice(),
promotions: promotions.into_boxed_slice(),
shard_mask: SHARD_COUNT - 1,
node_id,
}
}
fn shard_for(&self, key: &str) -> usize {
fnv1a(key.as_bytes()) as usize & self.shard_mask
}
fn wall_ms_now() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
u64::try_from(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis(),
)
.unwrap_or(u64::MAX)
}
pub fn cache_get(&self, key: &str) -> Option<Vec<u8>> {
self.cache.get(key, Self::wall_ms_now())
let idx = self.shard_for(key);
let result = self.shards[idx].read().cache.get(key, Self::wall_ms_now());
if result.is_some() {
let mut promos = self.promotions[idx].lock();
if promos.len() < MAX_PROMOTIONS_PER_SHARD {
promos.push(key.to_string());
}
}
result
}
pub fn cache_set(&mut self, key: String, value: Vec<u8>, ttl_ms: u64) {
let ts = self.hlc.now();
self.cache.set(key, value, ts, ttl_ms, Self::wall_ms_now());
pub fn cache_set(&self, key: String, value: Vec<u8>, ttl_ms: u64) {
if value.len() > MAX_REPLICABLE_VALUE_SIZE {
tracing::warn!(
key = %key,
value_size = value.len(),
max = MAX_REPLICABLE_VALUE_SIZE,
"value exceeds replicable size limit, may fail to replicate to peers"
);
}
let ts = self.hlc.lock().now();
let wall = Self::wall_ms_now();
self.shards[self.shard_for(&key)]
.write()
.cache
.set(key, value, ts, ttl_ms, wall);
}
pub fn cache_delete(&mut self, key: &str) {
let ts = self.hlc.now();
self.cache.delete(key, ts, Self::wall_ms_now());
pub fn cache_delete(&self, key: &str) {
let ts = self.hlc.lock().now();
let wall = Self::wall_ms_now();
self.shards[self.shard_for(key)]
.write()
.cache
.delete(key, ts, wall);
}
pub fn rate_limit_peek(&self, key: &str, window_ms: u64) -> u64 {
self.rate_limits
self.shards[self.shard_for(key)]
.read()
.rate_limits
.peek_count(key, window_ms, Self::wall_ms_now())
}
pub fn rate_limit_check(&mut self, key: &str, limit: u32, window_ms: u64) -> bool {
self.rate_limits
pub fn rate_limit_check(&self, key: &str, limit: u32, window_ms: u64) -> bool {
self.shards[self.shard_for(key)]
.write()
.rate_limits
.check_and_increment(key, limit, window_ms, Self::wall_ms_now())
}
pub fn peek_broadcast_delta(&self) -> CrdtDelta {
let cache_delta = {
let d = self.cache.extract_delta_since(self.last_broadcast_ts);
match d.entries.is_empty() {
true => None,
false => Some(d),
}
let mut cache_entries: Vec<(String, lww_map::LwwEntry)> = Vec::new();
let mut rate_limit_deltas: Vec<g_counter::GCounterDelta> = Vec::new();
self.shards.iter().for_each(|shard_lock| {
let shard = shard_lock.read();
let lww_delta = shard.cache.extract_delta_since(shard.last_broadcast_ts);
cache_entries.extend(lww_delta.entries);
rate_limit_deltas.extend(shard.rate_limits.extract_dirty_deltas());
});
let cache_delta = match cache_entries.is_empty() {
true => None,
false => Some(LwwDelta { entries: cache_entries }),
};
let rate_limit_deltas = self.rate_limits.extract_dirty_deltas();
CrdtDelta {
version: 1,
source_node: self.hlc.node_id(),
source_node: self.node_id,
cache_delta,
rate_limit_deltas,
}
}
pub fn commit_broadcast(&mut self, delta: &CrdtDelta) {
let max_ts = delta
pub fn commit_broadcast(&self, delta: &CrdtDelta) {
let cache_entries_by_shard: Vec<(usize, &HlcTimestamp)> = delta
.cache_delta
.as_ref()
.and_then(|d| d.entries.iter().map(|(_, e)| e.timestamp).max())
.unwrap_or(self.last_broadcast_ts);
self.last_broadcast_ts = max_ts;
let committed_keys: std::collections::HashSet<&str> = delta
.map(|d| {
d.entries
.iter()
.map(|(key, entry)| (self.shard_for(key), &entry.timestamp))
.collect()
})
.unwrap_or_default();
let mut max_ts_per_shard: Vec<Option<HlcTimestamp>> = (0..self.shards.len())
.map(|_| None)
.collect();
cache_entries_by_shard.iter().for_each(|&(shard_idx, ts)| {
let slot = &mut max_ts_per_shard[shard_idx];
*slot = Some(match slot {
Some(existing) if *existing >= *ts => *existing,
_ => *ts,
});
});
let rl_index: std::collections::HashMap<&str, &g_counter::GCounter> = delta
.rate_limit_deltas
.iter()
.map(|d| d.key.as_str())
.map(|d| (d.key.as_str(), &d.counter))
.collect();
committed_keys.iter().for_each(|&key| {
let still_matches = self
.rate_limits
.peek_dirty_counter(key)
.zip(delta.rate_limit_deltas.iter().find(|d| d.key == key))
.is_some_and(|(current, committed)| {
current.window_start_ms == committed.counter.window_start_ms
&& current.total() == committed.counter.total()
});
if still_matches {
self.rate_limits.clear_single_dirty(key);
let mut shard_rl_keys: Vec<Vec<&str>> = (0..self.shards.len())
.map(|_| Vec::new())
.collect();
rl_index.keys().for_each(|&key| {
shard_rl_keys[self.shard_for(key)].push(key);
});
self.shards.iter().enumerate().for_each(|(idx, shard_lock)| {
let has_cache_update = max_ts_per_shard[idx].is_some();
let has_rl_keys = !shard_rl_keys[idx].is_empty();
if !has_cache_update && !has_rl_keys {
return;
}
let mut shard = shard_lock.write();
if let Some(max_ts) = max_ts_per_shard[idx] {
shard.last_broadcast_ts = max_ts;
}
shard_rl_keys[idx].iter().for_each(|&key| {
let still_matches = shard
.rate_limits
.peek_dirty_counter(key)
.zip(rl_index.get(key))
.is_some_and(|(current, committed)| {
current.window_start_ms == committed.window_start_ms
&& current.total() == committed.total()
});
if still_matches {
shard.rate_limits.clear_single_dirty(key);
}
});
});
}
pub fn merge_delta(&mut self, delta: &CrdtDelta) -> bool {
pub fn merge_delta(&self, delta: &CrdtDelta) -> bool {
if !delta.is_compatible() {
tracing::warn!(
version = delta.version,
@@ -109,43 +218,136 @@ impl CrdtStore {
);
return false;
}
let mut changed = false;
if let Some(ref cache_delta) = delta.cache_delta {
if let Some(max_ts) = cache_delta.entries.iter().map(|(_, e)| e.timestamp).max() {
let _ = self.hlc.lock().receive(max_ts);
}
}
let mut changed = false;
if let Some(ref cache_delta) = delta.cache_delta {
let mut entries_by_shard: Vec<Vec<(String, lww_map::LwwEntry)>> =
(0..self.shards.len()).map(|_| Vec::new()).collect();
cache_delta.entries.iter().for_each(|(key, entry)| {
let _ = self.hlc.receive(entry.timestamp);
if self.cache.merge_entry(key.clone(), entry.clone()) {
changed = true;
entries_by_shard[self.shard_for(key)].push((key.clone(), entry.clone()));
});
entries_by_shard.into_iter().enumerate().for_each(|(idx, entries)| {
if entries.is_empty() {
return;
}
let mut shard = self.shards[idx].write();
entries.into_iter().for_each(|(key, entry)| {
if shard.cache.merge_entry(key, entry) {
changed = true;
}
});
});
}
delta.rate_limit_deltas.iter().for_each(|rd| {
if self
.rate_limits
.merge_counter(rd.key.clone(), &rd.counter)
{
changed = true;
}
});
if !delta.rate_limit_deltas.is_empty() {
let mut rl_by_shard: Vec<Vec<(String, &g_counter::GCounter)>> =
(0..self.shards.len()).map(|_| Vec::new()).collect();
delta.rate_limit_deltas.iter().for_each(|rd| {
rl_by_shard[self.shard_for(&rd.key)].push((rd.key.clone(), &rd.counter));
});
rl_by_shard.into_iter().enumerate().for_each(|(idx, entries)| {
if entries.is_empty() {
return;
}
let mut shard = self.shards[idx].write();
entries.into_iter().for_each(|(key, counter)| {
if shard.rate_limits.merge_counter(key, counter) {
changed = true;
}
});
});
}
changed
}
pub fn run_maintenance(&mut self) {
pub fn run_maintenance(&self) {
let now = Self::wall_ms_now();
self.cache.gc_tombstones(now);
self.cache.gc_expired(now);
self.rate_limits.gc_expired(now);
self.shards.iter().enumerate().for_each(|(idx, shard_lock)| {
let pending: Vec<String> = self.promotions[idx].lock().drain(..).collect();
let mut shard = shard_lock.write();
pending.iter().for_each(|key| shard.cache.touch(key));
shard.cache.gc_tombstones(now);
shard.cache.gc_expired(now);
shard.rate_limits.gc_expired(now);
});
}
pub fn peek_full_state(&self) -> CrdtDelta {
let mut cache_entries: Vec<(String, lww_map::LwwEntry)> = Vec::new();
let mut rate_limit_deltas: Vec<g_counter::GCounterDelta> = Vec::new();
self.shards.iter().for_each(|shard_lock| {
let shard = shard_lock.read();
let lww_delta = shard.cache.extract_delta_since(HlcTimestamp::ZERO);
cache_entries.extend(lww_delta.entries);
rate_limit_deltas.extend(shard.rate_limits.extract_all_deltas());
});
let cache_delta = match cache_entries.is_empty() {
true => None,
false => Some(LwwDelta { entries: cache_entries }),
};
CrdtDelta {
version: 1,
source_node: self.node_id,
cache_delta,
rate_limit_deltas,
}
}
pub fn cache_estimated_bytes(&self) -> usize {
self.cache.estimated_bytes()
self.shards
.iter()
.map(|s| s.read().cache.estimated_bytes())
.fold(0usize, usize::saturating_add)
}
pub fn rate_limit_estimated_bytes(&self) -> usize {
self.rate_limits.estimated_bytes()
self.shards
.iter()
.map(|s| s.read().rate_limits.estimated_bytes())
.fold(0usize, usize::saturating_add)
}
pub fn evict_lru(&mut self) -> Option<String> {
self.cache.evict_lru()
pub fn total_estimated_bytes(&self) -> usize {
self.shards
.iter()
.map(|s| {
let shard = s.read();
shard.cache.estimated_bytes().saturating_add(shard.rate_limits.estimated_bytes())
})
.fold(0usize, usize::saturating_add)
}
pub fn evict_lru_round_robin(&self, start_shard: usize) -> Option<(usize, usize)> {
(0..self.shards.len()).find_map(|offset| {
let idx = (start_shard + offset) & self.shard_mask;
let has_entries = self.shards[idx].read().cache.len() > 0;
match has_entries {
true => {
let mut shard = self.shards[idx].write();
let before = shard.cache.estimated_bytes();
shard.cache.evict_lru().map(|_| {
let freed = before.saturating_sub(shard.cache.estimated_bytes());
((idx + 1) & self.shard_mask, freed)
})
}
false => None,
}
})
}
}
@@ -155,15 +357,15 @@ mod tests {
#[test]
fn roundtrip_cache() {
let mut store = CrdtStore::new(1);
let store = ShardedCrdtStore::new(1);
store.cache_set("key".into(), b"value".to_vec(), 60_000);
assert_eq!(store.cache_get("key"), Some(b"value".to_vec()));
}
#[test]
fn delta_merge_convergence() {
let mut store_a = CrdtStore::new(1);
let mut store_b = CrdtStore::new(2);
let store_a = ShardedCrdtStore::new(1);
let store_b = ShardedCrdtStore::new(2);
store_a.cache_set("x".into(), b"from_a".to_vec(), 60_000);
store_b.cache_set("y".into(), b"from_b".to_vec(), 60_000);
@@ -184,8 +386,8 @@ mod tests {
#[test]
fn rate_limit_across_stores() {
let mut store_a = CrdtStore::new(1);
let mut store_b = CrdtStore::new(2);
let store_a = ShardedCrdtStore::new(1);
let store_b = ShardedCrdtStore::new(2);
store_a.rate_limit_check("rl:test", 5, 60_000);
store_a.rate_limit_check("rl:test", 5, 60_000);
@@ -202,7 +404,7 @@ mod tests {
#[test]
fn incompatible_version_rejected() {
let mut store = CrdtStore::new(1);
let store = ShardedCrdtStore::new(1);
let delta = CrdtDelta {
version: 255,
source_node: 99,

View File

@@ -1,11 +1,11 @@
use crate::cache::RippleCache;
use crate::config::RippleConfig;
use crate::crdt::CrdtStore;
use crate::crdt::ShardedCrdtStore;
use crate::eviction::MemoryBudget;
use crate::gossip::{GossipEngine, PeerId};
use crate::metrics;
use crate::rate_limiter::RippleRateLimiter;
use crate::transport::Transport;
use parking_lot::RwLock;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
@@ -18,7 +18,7 @@ impl RippleEngine {
config: RippleConfig,
shutdown: CancellationToken,
) -> Result<(Arc<dyn Cache>, Arc<dyn DistributedRateLimiter>, SocketAddr), RippleStartError> {
let store = Arc::new(RwLock::new(CrdtStore::new(config.machine_id)));
let store = Arc::new(ShardedCrdtStore::new(config.machine_id));
let (transport, incoming_rx) = Transport::bind(config.bind_addr, config.machine_id, shutdown.clone())
.await
@@ -27,10 +27,18 @@ impl RippleEngine {
let transport = Arc::new(transport);
let bound_addr = transport.local_addr();
let generation = u32::try_from(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
% u64::from(u32::MAX),
)
.unwrap_or(0);
let local_id = PeerId {
addr: bound_addr,
machine_id: config.machine_id,
generation: 0,
generation,
};
let gossip = GossipEngine::new(transport, store.clone(), local_id);
@@ -51,7 +59,7 @@ impl RippleEngine {
tokio::select! {
_ = eviction_shutdown.cancelled() => break,
_ = interval.tick() => {
budget.enforce(&mut store_for_eviction.write());
budget.enforce(&store_for_eviction);
}
}
}
@@ -74,6 +82,8 @@ impl RippleEngine {
let rate_limiter: Arc<dyn DistributedRateLimiter> =
Arc::new(RippleRateLimiter::new(store));
metrics::describe_metrics();
tracing::info!(
bind = %bound_addr,
machine_id = config.machine_id,

View File

@@ -1,45 +1,72 @@
use crate::crdt::CrdtStore;
use crate::crdt::ShardedCrdtStore;
use crate::metrics;
pub struct MemoryBudget {
max_bytes: usize,
next_shard: std::sync::atomic::AtomicUsize,
}
impl MemoryBudget {
pub fn new(max_bytes: usize) -> Self {
Self { max_bytes }
Self {
max_bytes,
next_shard: std::sync::atomic::AtomicUsize::new(0),
}
}
pub fn enforce(&self, store: &mut CrdtStore) {
pub fn enforce(&self, store: &ShardedCrdtStore) {
store.run_maintenance();
let cache_bytes = store.cache_estimated_bytes();
let rl_bytes = store.rate_limit_estimated_bytes();
metrics::set_cache_bytes(cache_bytes);
metrics::set_rate_limit_bytes(rl_bytes);
let max_bytes = self.max_bytes;
let total_bytes = store.cache_estimated_bytes().saturating_add(store.rate_limit_estimated_bytes());
let overshoot_ratio = match total_bytes > max_bytes && max_bytes > 0 {
true => total_bytes / max_bytes,
let total_bytes = cache_bytes.saturating_add(rl_bytes);
let overshoot_pct = match total_bytes > max_bytes && max_bytes > 0 {
true => total_bytes.saturating_sub(max_bytes).saturating_mul(100) / max_bytes,
false => 0,
};
const BASE_BATCH: usize = 256;
let batch_size = match overshoot_ratio {
0..=1 => BASE_BATCH,
2..=4 => BASE_BATCH * 4,
let batch_size = match overshoot_pct {
0..=25 => BASE_BATCH,
26..=100 => BASE_BATCH * 4,
_ => BASE_BATCH * 8,
};
let evicted = std::iter::from_fn(|| {
let current = store.cache_estimated_bytes().saturating_add(store.rate_limit_estimated_bytes());
match current > max_bytes {
true => store.evict_lru(),
false => None,
let mut remaining = total_bytes;
let mut next_shard: usize = self.next_shard.load(std::sync::atomic::Ordering::Relaxed);
let mut evicted: usize = 0;
(0..batch_size).try_for_each(|_| {
match remaining > max_bytes {
true => {
match store.evict_lru_round_robin(next_shard) {
Some((ns, freed)) => {
next_shard = ns;
remaining = remaining.saturating_sub(freed);
evicted += 1;
Ok(())
}
None => Err(()),
}
}
false => Err(()),
}
})
.take(batch_size)
.count();
.ok();
self.next_shard.store(next_shard, std::sync::atomic::Ordering::Relaxed);
if evicted > 0 {
metrics::record_evictions(evicted);
let cache_bytes_after = store.cache_estimated_bytes();
let rl_bytes_after = store.rate_limit_estimated_bytes();
metrics::set_cache_bytes(cache_bytes_after);
metrics::set_rate_limit_bytes(rl_bytes_after);
tracing::info!(
evicted_entries = evicted,
cache_bytes = store.cache_estimated_bytes(),
rate_limit_bytes = store.rate_limit_estimated_bytes(),
cache_bytes = cache_bytes_after,
rate_limit_bytes = rl_bytes_after,
max_bytes = self.max_bytes,
"memory budget eviction"
);
@@ -53,16 +80,16 @@ mod tests {
#[test]
fn eviction_under_budget() {
let mut store = CrdtStore::new(1);
let store = ShardedCrdtStore::new(1);
let budget = MemoryBudget::new(1024 * 1024);
store.cache_set("k".into(), vec![1, 2, 3], 60_000);
budget.enforce(&mut store);
budget.enforce(&store);
assert!(store.cache_get("k").is_some());
}
#[test]
fn eviction_over_budget() {
let mut store = CrdtStore::new(1);
let store = ShardedCrdtStore::new(1);
let budget = MemoryBudget::new(100);
(0..50).for_each(|i| {
store.cache_set(
@@ -71,8 +98,7 @@ mod tests {
60_000,
);
});
budget.enforce(&mut store);
let total = store.cache_estimated_bytes().saturating_add(store.rate_limit_estimated_bytes());
assert!(total <= 100);
budget.enforce(&store);
assert!(store.total_estimated_bytes() <= 100);
}
}

View File

@@ -1,8 +1,9 @@
use crate::crdt::delta::CrdtDelta;
use crate::crdt::CrdtStore;
use crate::crdt::lww_map::LwwDelta;
use crate::crdt::ShardedCrdtStore;
use crate::metrics;
use crate::transport::{ChannelTag, IncomingFrame, Transport};
use foca::{Config, Foca, Notification, Runtime, Timer};
use parking_lot::RwLock;
use rand::rngs::StdRng;
use rand::SeedableRng;
use std::collections::HashSet;
@@ -114,6 +115,10 @@ impl MemberTracker {
fn active_peers(&self) -> impl Iterator<Item = SocketAddr> + '_ {
self.active_addrs.iter().copied()
}
fn peer_count(&self) -> usize {
self.active_addrs.len()
}
}
impl Runtime<PeerId> for &mut BufferedRuntime {
@@ -141,14 +146,14 @@ impl Runtime<PeerId> for &mut BufferedRuntime {
pub struct GossipEngine {
transport: Arc<Transport>,
store: Arc<RwLock<CrdtStore>>,
store: Arc<ShardedCrdtStore>,
local_id: PeerId,
}
impl GossipEngine {
pub fn new(
transport: Arc<Transport>,
store: Arc<RwLock<CrdtStore>>,
store: Arc<ShardedCrdtStore>,
local_id: PeerId,
) -> Self {
Self {
@@ -203,11 +208,11 @@ impl GossipEngine {
}
});
drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &shutdown);
drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &store, &shutdown);
let mut gossip_tick =
tokio::time::interval(Duration::from_millis(gossip_interval_ms));
let mut maintenance_tick = tokio::time::interval(Duration::from_secs(10));
gossip_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
@@ -222,11 +227,13 @@ impl GossipEngine {
if let Err(e) = foca.handle_data(&frame.data, &mut runtime) {
tracing::warn!(error = %e, "foca handle_data error");
}
drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &shutdown);
drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &store, &shutdown);
}
ChannelTag::CrdtSync => {
const MAX_DELTA_ENTRIES: usize = 10_000;
const MAX_DELTA_RATE_LIMITS: usize = 10_000;
metrics::record_gossip_delta_received();
metrics::record_gossip_delta_bytes(frame.data.len());
match bincode::serde::decode_from_slice::<CrdtDelta, _>(&frame.data, bincode::config::standard()) {
Ok((delta, _)) => {
let cache_len = delta.cache_delta.as_ref().map_or(0, |d| d.entries.len());
@@ -235,6 +242,7 @@ impl GossipEngine {
let window_mismatch = delta.rate_limit_deltas.iter().any(|rd| rd.counter.window_duration_ms == 0);
match cache_len > MAX_DELTA_ENTRIES || rl_len > MAX_DELTA_RATE_LIMITS || gcounter_oversize || window_mismatch {
true => {
metrics::record_gossip_drop();
tracing::warn!(
cache_entries = cache_len,
rate_limit_entries = rl_len,
@@ -243,11 +251,14 @@ impl GossipEngine {
);
}
false => {
store.write().merge_delta(&delta);
if store.merge_delta(&delta) {
metrics::record_gossip_merge();
}
}
}
}
Err(e) => {
metrics::record_gossip_drop();
tracing::warn!(error = %e, "failed to decode crdt sync delta");
}
}
@@ -256,72 +267,76 @@ impl GossipEngine {
}
}
_ = gossip_tick.tick() => {
let pending = {
let s = store.read();
let delta = s.peek_broadcast_delta();
match delta.is_empty() {
true => None,
false => {
match bincode::serde::encode_to_vec(&delta, bincode::config::standard()) {
Ok(bytes) => Some((bytes, delta)),
Err(e) => {
tracing::warn!(error = %e, "failed to serialize broadcast delta");
None
}
}
},
}
};
if let Some((ref data, ref delta)) = pending {
let peers: Vec<SocketAddr> = members.active_peers().collect();
let mut all_queued = true;
let cancel = shutdown.clone();
peers.iter().for_each(|&addr| {
match transport.try_queue(addr, ChannelTag::CrdtSync, data) {
true => {}
false => {
all_queued = false;
let t = transport.clone();
let d = data.clone();
let c = cancel.clone();
tokio::spawn(async move {
tokio::select! {
_ = c.cancelled() => {}
_ = t.send(addr, ChannelTag::CrdtSync, &d) => {}
}
});
}
}
});
let stale = last_commit.elapsed() > Duration::from_secs(WATERMARK_STALE_SECS);
if all_queued || peers.is_empty() || stale {
if stale && !all_queued {
tracing::warn!(
elapsed_secs = last_commit.elapsed().as_secs(),
"force-advancing broadcast watermark (staleness cap)"
);
}
store.write().commit_broadcast(delta);
let delta = store.peek_broadcast_delta();
match delta.is_empty() {
true => {
last_commit = tokio::time::Instant::now();
}
}
false => {
let chunks = chunk_and_serialize(&delta);
match chunks.is_empty() {
true => {
tracing::warn!("all delta chunks failed to serialize, force-committing watermark");
store.commit_broadcast(&delta);
last_commit = tokio::time::Instant::now();
}
false => {
let peers: Vec<SocketAddr> = members.active_peers().collect();
let mut all_queued = true;
let cancel = shutdown.clone();
chunks.iter().for_each(|chunk| {
metrics::record_gossip_delta_bytes(chunk.len());
peers.iter().for_each(|&addr| {
metrics::record_gossip_delta_sent();
match transport.try_queue(addr, ChannelTag::CrdtSync, chunk) {
true => {}
false => {
all_queued = false;
let t = transport.clone();
let d = chunk.clone();
let c = cancel.clone();
tokio::spawn(async move {
tokio::select! {
_ = c.cancelled() => {}
_ = t.send(addr, ChannelTag::CrdtSync, &d) => {}
}
});
}
}
});
});
let stale = last_commit.elapsed() > Duration::from_secs(WATERMARK_STALE_SECS);
if all_queued || peers.is_empty() || stale {
if stale && !all_queued {
tracing::warn!(
elapsed_secs = last_commit.elapsed().as_secs(),
"force-advancing broadcast watermark (staleness cap)"
);
}
store.commit_broadcast(&delta);
last_commit = tokio::time::Instant::now();
}
}
}
}
};
if let Err(e) = foca.gossip(&mut runtime) {
tracing::warn!(error = %e, "foca gossip error");
}
drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &shutdown);
drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &store, &shutdown);
}
Some((timer, _)) = timer_rx.recv() => {
if let Err(e) = foca.handle_timer(timer, &mut runtime) {
tracing::warn!(error = %e, "foca handle_timer error");
}
drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &shutdown);
drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &store, &shutdown);
}
_ = maintenance_tick.tick() => {
store.write().run_maintenance();
_ = tokio::time::sleep(Duration::from_secs(10)) => {
tracing::trace!(
members = foca.num_members(),
cache_bytes = store.read().cache_estimated_bytes(),
"maintenance cycle"
cache_bytes = store.cache_estimated_bytes(),
rate_limit_bytes = store.rate_limit_estimated_bytes(),
"gossip health check"
);
}
}
@@ -331,25 +346,21 @@ impl GossipEngine {
}
fn flush_final_delta(
store: &Arc<RwLock<CrdtStore>>,
store: &Arc<ShardedCrdtStore>,
transport: &Arc<Transport>,
members: &MemberTracker,
) {
let s = store.read();
let delta = s.peek_broadcast_delta();
let delta = store.peek_broadcast_delta();
if delta.is_empty() {
return;
}
match bincode::serde::encode_to_vec(&delta, bincode::config::standard()) {
Ok(bytes) => {
members.active_peers().for_each(|addr| {
let _ = transport.try_queue(addr, ChannelTag::CrdtSync, &bytes);
});
}
Err(e) => {
tracing::warn!(error = %e, "failed to serialize final delta on shutdown");
}
}
let chunks = chunk_and_serialize(&delta);
chunks.iter().for_each(|chunk| {
members.active_peers().for_each(|addr| {
let _ = transport.try_queue(addr, ChannelTag::CrdtSync, chunk);
});
});
store.commit_broadcast(&delta);
}
fn drain_runtime_actions(
@@ -357,6 +368,7 @@ fn drain_runtime_actions(
transport: &Arc<Transport>,
timer_tx: &mpsc::Sender<(Timer<PeerId>, Duration)>,
members: &mut MemberTracker,
store: &Arc<ShardedCrdtStore>,
shutdown: &CancellationToken,
) {
let actions: Vec<RuntimeAction> = runtime.actions.drain(..).collect();
@@ -373,18 +385,105 @@ fn drain_runtime_actions(
}
RuntimeAction::ScheduleTimer(timer, duration) => {
let tx = timer_tx.clone();
let c = shutdown.clone();
tokio::spawn(async move {
tokio::time::sleep(duration).await;
let _ = tx.send((timer, duration)).await;
tokio::select! {
_ = c.cancelled() => {}
_ = tokio::time::sleep(duration) => {
let _ = tx.send((timer, duration)).await;
}
}
});
}
RuntimeAction::MemberUp(addr) => {
tracing::info!(peer = %addr, "member up");
members.member_up(addr);
metrics::set_gossip_peers(members.peer_count());
let snapshot = store.peek_full_state();
if !snapshot.is_empty() {
chunk_and_serialize(&snapshot).into_iter().for_each(|chunk| {
let t = transport.clone();
let c = shutdown.clone();
tokio::spawn(async move {
tokio::select! {
_ = c.cancelled() => {}
_ = t.send(addr, ChannelTag::CrdtSync, &chunk) => {}
}
});
});
}
}
RuntimeAction::MemberDown(addr) => {
tracing::info!(peer = %addr, "member down");
members.member_down(addr);
metrics::set_gossip_peers(members.peer_count());
}
});
}
fn chunk_and_serialize(delta: &CrdtDelta) -> Vec<Vec<u8>> {
let config = bincode::config::standard();
match bincode::serde::encode_to_vec(delta, config) {
Ok(bytes) if bytes.len() <= crate::transport::MAX_FRAME_SIZE => vec![bytes],
Ok(_) => split_and_serialize(delta.clone()),
Err(e) => {
tracing::warn!(error = %e, "failed to serialize delta");
vec![]
}
}
}
fn split_and_serialize(delta: CrdtDelta) -> Vec<Vec<u8>> {
let version = delta.version;
let source_node = delta.source_node;
let cache_entries = delta.cache_delta.map_or(Vec::new(), |d| d.entries);
let rl_deltas = delta.rate_limit_deltas;
if cache_entries.is_empty() && rl_deltas.is_empty() {
return vec![];
}
if cache_entries.len() <= 1 && rl_deltas.len() <= 1 {
let mini = CrdtDelta {
version,
source_node,
cache_delta: match cache_entries.is_empty() {
true => None,
false => Some(LwwDelta { entries: cache_entries }),
},
rate_limit_deltas: rl_deltas,
};
match bincode::serde::encode_to_vec(&mini, bincode::config::standard()) {
Ok(bytes) if bytes.len() <= crate::transport::MAX_FRAME_SIZE => return vec![bytes],
_ => {
tracing::error!("irreducible delta entry exceeds max frame size, dropping");
return vec![];
}
}
}
let mid_cache = cache_entries.len() / 2;
let mid_rl = rl_deltas.len() / 2;
let mut left_cache = cache_entries;
let right_cache = left_cache.split_off(mid_cache);
let mut left_rl = rl_deltas;
let right_rl = left_rl.split_off(mid_rl);
let make_sub = |entries: Vec<_>, rls| CrdtDelta {
version,
source_node,
cache_delta: match entries.is_empty() {
true => None,
false => Some(LwwDelta { entries }),
},
rate_limit_deltas: rls,
};
let left = make_sub(left_cache, left_rl);
let right = make_sub(right_cache, right_rl);
let mut result = chunk_and_serialize(&left);
result.extend(chunk_and_serialize(&right));
result
}

View File

@@ -4,6 +4,7 @@ pub mod crdt;
pub mod engine;
pub mod eviction;
pub mod gossip;
pub mod metrics;
pub mod rate_limiter;
pub mod transport;

View File

@@ -0,0 +1,108 @@
use metrics::{counter, gauge, histogram};
pub fn describe_metrics() {
metrics::describe_gauge!(
"tranquil_ripple_cache_bytes",
"Estimated memory used by cache entries"
);
metrics::describe_gauge!(
"tranquil_ripple_rate_limit_bytes",
"Estimated memory used by rate limit counters"
);
metrics::describe_gauge!(
"tranquil_ripple_gossip_peers",
"Number of active gossip peers"
);
metrics::describe_counter!(
"tranquil_ripple_cache_hits_total",
"Total cache read hits"
);
metrics::describe_counter!(
"tranquil_ripple_cache_misses_total",
"Total cache read misses"
);
metrics::describe_counter!(
"tranquil_ripple_cache_writes_total",
"Total cache write operations"
);
metrics::describe_counter!(
"tranquil_ripple_cache_deletes_total",
"Total cache delete operations"
);
metrics::describe_counter!(
"tranquil_ripple_evictions_total",
"Total cache entries evicted by memory budget"
);
metrics::describe_counter!(
"tranquil_ripple_gossip_deltas_sent_total",
"Total CRDT delta chunks sent to peers"
);
metrics::describe_counter!(
"tranquil_ripple_gossip_deltas_received_total",
"Total CRDT delta messages received from peers"
);
metrics::describe_counter!(
"tranquil_ripple_gossip_merges_total",
"Total CRDT deltas merged with local state change"
);
metrics::describe_counter!(
"tranquil_ripple_gossip_drops_total",
"Total CRDT deltas dropped (validation or decode failure)"
);
metrics::describe_histogram!(
"tranquil_ripple_gossip_delta_bytes",
"Size of CRDT delta chunks in bytes"
);
}
pub fn record_cache_hit() {
counter!("tranquil_ripple_cache_hits_total").increment(1);
}
pub fn record_cache_miss() {
counter!("tranquil_ripple_cache_misses_total").increment(1);
}
pub fn record_cache_write() {
counter!("tranquil_ripple_cache_writes_total").increment(1);
}
pub fn record_cache_delete() {
counter!("tranquil_ripple_cache_deletes_total").increment(1);
}
pub fn set_cache_bytes(bytes: usize) {
gauge!("tranquil_ripple_cache_bytes").set(bytes as f64);
}
pub fn set_rate_limit_bytes(bytes: usize) {
gauge!("tranquil_ripple_rate_limit_bytes").set(bytes as f64);
}
pub fn set_gossip_peers(count: usize) {
gauge!("tranquil_ripple_gossip_peers").set(count as f64);
}
pub fn record_evictions(count: usize) {
counter!("tranquil_ripple_evictions_total").increment(count as u64);
}
pub fn record_gossip_delta_sent() {
counter!("tranquil_ripple_gossip_deltas_sent_total").increment(1);
}
pub fn record_gossip_delta_received() {
counter!("tranquil_ripple_gossip_deltas_received_total").increment(1);
}
pub fn record_gossip_merge() {
counter!("tranquil_ripple_gossip_merges_total").increment(1);
}
pub fn record_gossip_drop() {
counter!("tranquil_ripple_gossip_drops_total").increment(1);
}
pub fn record_gossip_delta_bytes(bytes: usize) {
histogram!("tranquil_ripple_gossip_delta_bytes").record(bytes as f64);
}

View File

@@ -1,15 +1,14 @@
use crate::crdt::CrdtStore;
use crate::crdt::ShardedCrdtStore;
use async_trait::async_trait;
use parking_lot::RwLock;
use std::sync::Arc;
use tranquil_infra::DistributedRateLimiter;
pub struct RippleRateLimiter {
store: Arc<RwLock<CrdtStore>>,
store: Arc<ShardedCrdtStore>,
}
impl RippleRateLimiter {
pub fn new(store: Arc<RwLock<CrdtStore>>) -> Self {
pub fn new(store: Arc<ShardedCrdtStore>) -> Self {
Self { store }
}
}
@@ -17,11 +16,11 @@ impl RippleRateLimiter {
#[async_trait]
impl DistributedRateLimiter for RippleRateLimiter {
async fn check_rate_limit(&self, key: &str, limit: u32, window_ms: u64) -> bool {
self.store.write().rate_limit_check(key, limit, window_ms)
self.store.rate_limit_check(key, limit, window_ms)
}
async fn peek_rate_limit_count(&self, key: &str, window_ms: u64) -> u64 {
self.store.read().rate_limit_peek(key, window_ms)
self.store.rate_limit_peek(key, window_ms)
}
}
@@ -31,7 +30,7 @@ mod tests {
#[tokio::test]
async fn rate_limiter_trait_allows_within_limit() {
let store = Arc::new(RwLock::new(CrdtStore::new(1)));
let store = Arc::new(ShardedCrdtStore::new(1));
let rl = RippleRateLimiter::new(store);
assert!(rl.check_rate_limit("test", 5, 60_000).await);
assert!(rl.check_rate_limit("test", 5, 60_000).await);
@@ -39,7 +38,7 @@ mod tests {
#[tokio::test]
async fn rate_limiter_trait_blocks_over_limit() {
let store = Arc::new(RwLock::new(CrdtStore::new(1)));
let store = Arc::new(ShardedCrdtStore::new(1));
let rl = RippleRateLimiter::new(store);
assert!(rl.check_rate_limit("k", 3, 60_000).await);
assert!(rl.check_rate_limit("k", 3, 60_000).await);

View File

@@ -3,15 +3,16 @@ use bytes::{Buf, BufMut, BytesMut};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
const MAX_FRAME_SIZE: usize = 16 * 1024 * 1024;
pub(crate) const MAX_FRAME_SIZE: usize = 16 * 1024 * 1024;
const MAX_INBOUND_CONNECTIONS: usize = 512;
const MAX_OUTBOUND_CONNECTIONS: usize = 512;
const WRITE_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -44,6 +45,7 @@ pub struct IncomingFrame {
struct ConnectionWriter {
tx: mpsc::Sender<Vec<u8>>,
generation: u64,
}
pub struct Transport {
@@ -51,8 +53,10 @@ pub struct Transport {
_machine_id: u64,
connections: Arc<parking_lot::Mutex<HashMap<SocketAddr, ConnectionWriter>>>,
connecting: Arc<parking_lot::Mutex<std::collections::HashSet<SocketAddr>>>,
conn_generation: Arc<AtomicU64>,
#[allow(dead_code)]
inbound_count: Arc<AtomicUsize>,
outbound_count: Arc<AtomicUsize>,
shutdown: CancellationToken,
incoming_tx: mpsc::Sender<IncomingFrame>,
}
@@ -73,7 +77,9 @@ impl Transport {
_machine_id: machine_id,
connections: Arc::new(parking_lot::Mutex::new(HashMap::new())),
connecting: Arc::new(parking_lot::Mutex::new(std::collections::HashSet::new())),
conn_generation: Arc::new(AtomicU64::new(0)),
inbound_count: inbound_count.clone(),
outbound_count: Arc::new(AtomicUsize::new(0)),
shutdown: shutdown.clone(),
incoming_tx: incoming_tx.clone(),
};
@@ -99,6 +105,7 @@ impl Transport {
continue;
}
inbound_counter.fetch_add(1, Ordering::Relaxed);
configure_socket(&stream);
Self::spawn_reader(
stream,
peer_addr,
@@ -144,12 +151,20 @@ impl Transport {
};
let writer = {
let conns = self.connections.lock();
conns.get(&target).map(|w| w.tx.clone())
conns.get(&target).map(|w| (w.tx.clone(), w.generation))
};
match writer {
Some(tx) => {
Some((tx, acquired_gen)) => {
if tx.send(frame).await.is_err() {
self.connections.lock().remove(&target);
{
let mut conns = self.connections.lock();
let stale = conns
.get(&target)
.is_some_and(|w| w.generation == acquired_gen);
if stale {
conns.remove(&target);
}
}
self.connect_and_send(target, tag, data).await;
}
}
@@ -163,7 +178,7 @@ impl Transport {
{
let mut connecting = self.connecting.lock();
if connecting.contains(&target) {
tracing::debug!(peer = %target, "connection already in-flight, dropping frame");
tracing::warn!(peer = %target, "connection already in-flight, dropping frame");
return;
}
connecting.insert(target);
@@ -191,17 +206,39 @@ impl Transport {
.await;
match stream {
Ok(stream) => {
if self.outbound_count.load(Ordering::Relaxed) >= MAX_OUTBOUND_CONNECTIONS {
tracing::warn!(
peer = %target,
max = MAX_OUTBOUND_CONNECTIONS,
"outbound connection limit reached, dropping"
);
return;
}
self.outbound_count.fetch_add(1, Ordering::Relaxed);
configure_socket(&stream);
let (read_half, write_half) = stream.into_split();
let (write_tx, mut write_rx) = mpsc::channel::<Vec<u8>>(1024);
let cancel = self.shutdown.clone();
let conn_gen = self.conn_generation.fetch_add(1, Ordering::Relaxed);
self.connections.lock().insert(
target,
ConnectionWriter { tx: write_tx.clone(), generation: conn_gen },
);
if let Some(frame) = encode_frame(tag, data) {
let _ = write_tx.try_send(frame);
}
let conn_cancel = self.shutdown.child_token();
let reader_cancel = conn_cancel.clone();
let connections = self.connections.clone();
let outbound_counter = self.outbound_count.clone();
let peer = target;
tokio::spawn(async move {
let mut writer = write_half;
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = conn_cancel.cancelled() => break,
msg = write_rx.recv() => {
match msg {
Some(buf) => {
@@ -227,19 +264,11 @@ impl Transport {
}
}
connections.lock().remove(&peer);
outbound_counter.fetch_sub(1, Ordering::Relaxed);
conn_cancel.cancel();
});
Self::spawn_reader_half(read_half, target, self.incoming_tx.clone(), self.shutdown.clone());
let frame = match encode_frame(tag, data) {
Some(f) => f,
None => return,
};
let _ = write_tx.send(frame).await;
self.connections.lock().insert(
target,
ConnectionWriter { tx: write_tx },
);
Self::spawn_reader_half(read_half, target, self.incoming_tx.clone(), reader_cancel);
tracing::debug!(peer = %target, "established outbound connection");
}
Err(e) => {
@@ -309,6 +338,7 @@ impl Transport {
}
}
}
cancel.cancel();
});
}
@@ -335,6 +365,19 @@ impl Transport {
}
}
fn configure_socket(stream: &TcpStream) {
let sock_ref = socket2::SockRef::from(stream);
if let Err(e) = sock_ref.set_tcp_nodelay(true) {
tracing::warn!(error = %e, "failed to set TCP_NODELAY");
}
let params = socket2::TcpKeepalive::new()
.with_time(Duration::from_secs(30))
.with_interval(Duration::from_secs(10));
if let Err(e) = sock_ref.set_tcp_keepalive(&params) {
tracing::warn!(error = %e, "failed to set TCP keepalive");
}
}
fn encode_frame(tag: ChannelTag, data: &[u8]) -> Option<Vec<u8>> {
match data.len() > MAX_FRAME_SIZE {
true => {

View File

@@ -608,3 +608,163 @@ async fn two_node_rate_limit_split_increment() {
shutdown.cancel();
}
#[tokio::test]
async fn two_node_partition_recovery() {
tracing_subscriber::fmt()
.with_max_level(tracing_subscriber::filter::LevelFilter::DEBUG)
.with_test_writer()
.try_init()
.ok();
let shutdown = CancellationToken::new();
let config_a = RippleConfig {
bind_addr: "127.0.0.1:0".parse().unwrap(),
seed_peers: vec![],
machine_id: 100,
gossip_interval_ms: 100,
cache_max_bytes: 64 * 1024 * 1024,
};
let (cache_a, _rl_a, addr_a) = RippleEngine::start(config_a, shutdown.clone())
.await
.expect("node A failed to start");
futures::future::join_all((0..50).map(|i| {
let cache = cache_a.clone();
async move {
cache
.set(
&format!("pre-{i}"),
&format!("val-{i}"),
Duration::from_secs(300),
)
.await
.expect("set on A failed");
}
}))
.await;
let config_b = RippleConfig {
bind_addr: "127.0.0.1:0".parse().unwrap(),
seed_peers: vec![addr_a],
machine_id: 200,
gossip_interval_ms: 100,
cache_max_bytes: 64 * 1024 * 1024,
};
let (cache_b, _rl_b, _addr_b) = RippleEngine::start(config_b, shutdown.clone())
.await
.expect("node B failed to start");
let b = cache_b.clone();
poll_until(15_000, 200, move || {
let b = b.clone();
async move {
futures::future::join_all((0..50).map(|i| {
let b = b.clone();
async move { b.get(&format!("pre-{i}")).await.is_some() }
}))
.await
.into_iter()
.all(|present| present)
}
})
.await;
futures::future::join_all((0..50).map(|i| {
let cache = cache_b.clone();
async move {
cache
.set(
&format!("post-{i}"),
&format!("bval-{i}"),
Duration::from_secs(300),
)
.await
.expect("set on B failed");
}
}))
.await;
let a = cache_a.clone();
poll_until(15_000, 200, move || {
let a = a.clone();
async move {
futures::future::join_all((0..50).map(|i| {
let a = a.clone();
async move { a.get(&format!("post-{i}")).await.is_some() }
}))
.await
.into_iter()
.all(|present| present)
}
})
.await;
shutdown.cancel();
}
#[tokio::test]
async fn two_node_stress_concurrent_load() {
tracing_subscriber::fmt()
.with_max_level(tracing_subscriber::filter::LevelFilter::INFO)
.with_test_writer()
.try_init()
.ok();
let shutdown = CancellationToken::new();
let ((cache_a, rl_a), (cache_b, rl_b)) = spawn_pair(shutdown.clone()).await;
let tasks: Vec<tokio::task::JoinHandle<()>> = (0u32..8).map(|task_id| {
let cache = match task_id < 4 {
true => cache_a.clone(),
false => cache_b.clone(),
};
let rl = match task_id < 4 {
true => rl_a.clone(),
false => rl_b.clone(),
};
tokio::spawn(async move {
let value = vec![0xABu8; 1024];
futures::future::join_all((0u32..500).map(|op| {
let cache = cache.clone();
let rl = rl.clone();
let value = value.clone();
async move {
let key_idx = op % 100;
let key = format!("stress-{task_id}-{key_idx}");
match op % 4 {
0 | 1 => {
cache
.set_bytes(&key, &value, Duration::from_secs(120))
.await
.expect("set_bytes failed");
}
2 => {
let _ = cache.get(&key).await;
}
_ => {
let _ = rl.check_rate_limit(&key, 1000, 60_000).await;
}
}
}
}))
.await;
})
}).collect();
let results = tokio::time::timeout(
Duration::from_secs(30),
futures::future::join_all(tasks),
)
.await
.expect("stress test timed out after 30s");
results.into_iter().enumerate().for_each(|(i, r)| {
r.unwrap_or_else(|e| panic!("task {i} panicked: {e}"));
});
tokio::time::sleep(Duration::from_secs(12)).await;
shutdown.cancel();
}