From 562f970bc3af92585cb6990fa911fb320dd8a89a Mon Sep 17 00:00:00 2001 From: Lewis Date: Wed, 10 Jun 2026 09:23:22 +0300 Subject: [PATCH] ripple: transport backpressure & connect coalesing Lewis: May this revision serve well! --- crates/tranquil-ripple/src/metrics.rs | 16 + crates/tranquil-ripple/src/transport.rs | 506 +++++++++++++++++++++++- 2 files changed, 502 insertions(+), 20 deletions(-) diff --git a/crates/tranquil-ripple/src/metrics.rs b/crates/tranquil-ripple/src/metrics.rs index 34ae8f6..cd9abaf 100644 --- a/crates/tranquil-ripple/src/metrics.rs +++ b/crates/tranquil-ripple/src/metrics.rs @@ -50,6 +50,14 @@ pub fn describe_metrics() { "tranquil_ripple_gossip_delta_bytes", "Size of CRDT delta chunks in bytes" ); + metrics::describe_counter!( + "tranquil_ripple_transport_write_failures_total", + "Total outbound frame writes that failed or timed out" + ); + metrics::describe_counter!( + "tranquil_ripple_transport_inbound_dropped_total", + "Total inbound frames dropped because the buffer budget was saturated" + ); } pub fn record_cache_hit() { @@ -103,3 +111,11 @@ pub fn record_gossip_drop() { pub fn record_gossip_delta_bytes(bytes: usize) { histogram!("tranquil_ripple_gossip_delta_bytes").record(bytes as f64); } + +pub fn record_transport_write_failure() { + counter!("tranquil_ripple_transport_write_failures_total").increment(1); +} + +pub fn record_transport_inbound_dropped() { + counter!("tranquil_ripple_transport_inbound_dropped_total").increment(1); +} diff --git a/crates/tranquil-ripple/src/transport.rs b/crates/tranquil-ripple/src/transport.rs index db4eee0..dcb78ba 100644 --- a/crates/tranquil-ripple/src/transport.rs +++ b/crates/tranquil-ripple/src/transport.rs @@ -6,17 +6,20 @@ use quinn::{ }; use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; use std::collections::HashMap; -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; -use tokio::sync::{Semaphore, mpsc}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, watch}; use tokio_util::sync::CancellationToken; pub(crate) const MAX_FRAME_SIZE: usize = 4 * 1024 * 1024; const MAX_INBOUND_CONNECTIONS: usize = 512; const MAX_OUTBOUND_CONNECTIONS: usize = 512; +const MAX_QUEUED_WRITES: usize = 1024; const MAX_CONCURRENT_UNI_STREAMS: u32 = 64; +const INBOUND_BYTE_BUDGET: usize = 128 * 1024 * 1024; +const MAX_READS_PER_PEER: usize = 32; const READ_CHUNK_BYTES: usize = 256 * 1024; const INCOMING_CHANNEL_DEPTH: usize = 1024; const STREAM_RECEIVE_WINDOW: u32 = MAX_FRAME_SIZE as u32; @@ -24,6 +27,7 @@ const CONNECTION_RECEIVE_WINDOW: u32 = 16 * 1024 * 1024; const KEEPALIVE: Duration = Duration::from_secs(20); const IDLE_TIMEOUT: Duration = Duration::from_secs(60); const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); +const CONNECT_JOIN_TIMEOUT: Duration = Duration::from_secs(30); const WRITE_TIMEOUT: Duration = Duration::from_secs(10); const READ_TIMEOUT: Duration = Duration::from_secs(30); const RIPPLE_ALPN: &[u8] = b"ripple/1"; @@ -56,6 +60,7 @@ pub struct IncomingFrame { pub from: SocketAddr, pub tag: ChannelTag, pub data: Vec, + _budget: OwnedSemaphorePermit, } struct PeerConn { @@ -63,13 +68,29 @@ struct PeerConn { generation: u64, } +type ConnectingMap = Arc>>>; + +struct ConnectingGuard { + connecting: ConnectingMap, + target: SocketAddr, +} + +impl Drop for ConnectingGuard { + fn drop(&mut self) { + self.connecting.lock().remove(&self.target); + } +} + pub struct Transport { endpoint: Endpoint, local_addr: SocketAddr, connections: Arc>>, - connecting: Arc>>, + connecting: ConnectingMap, conn_generation: Arc, outbound_permits: Arc, + queue_permits: Arc, + inbound_byte_budget: Arc, + peer_read_limiter: PeerReadLimiter, shutdown: CancellationToken, incoming_tx: mpsc::Sender, } @@ -88,14 +109,19 @@ impl Transport { endpoint.set_default_client_config(client_config); let local_addr = endpoint.local_addr()?; let (incoming_tx, incoming_rx) = mpsc::channel(INCOMING_CHANNEL_DEPTH); + let inbound_byte_budget = Arc::new(Semaphore::new(INBOUND_BYTE_BUDGET)); + let peer_read_limiter = PeerReadLimiter::new(MAX_READS_PER_PEER); let transport = Self { endpoint: endpoint.clone(), local_addr, connections: Arc::new(parking_lot::Mutex::new(HashMap::new())), - connecting: Arc::new(parking_lot::Mutex::new(std::collections::HashSet::new())), + connecting: Arc::new(parking_lot::Mutex::new(HashMap::new())), conn_generation: Arc::new(AtomicU64::new(0)), outbound_permits: Arc::new(Semaphore::new(MAX_OUTBOUND_CONNECTIONS)), + queue_permits: Arc::new(Semaphore::new(MAX_QUEUED_WRITES)), + inbound_byte_budget: inbound_byte_budget.clone(), + peer_read_limiter: peer_read_limiter.clone(), shutdown: shutdown.clone(), incoming_tx: incoming_tx.clone(), }; @@ -122,6 +148,8 @@ impl Transport { continue; }; let tx = incoming_tx.clone(); + let byte_budget = inbound_byte_budget.clone(); + let limiter = peer_read_limiter.clone(); let conn_cancel = cancel.child_token(); tokio::spawn(async move { let _permit = permit; @@ -129,7 +157,8 @@ impl Transport { Ok(conn) => { let from = conn.remote_address(); tracing::debug!(peer = %from, "accepted inbound connection"); - run_conn_reader(conn, from, tx, conn_cancel).await; + run_conn_reader(conn, from, tx, byte_budget, limiter, conn_cancel) + .await; } Err(e) => tracing::warn!(error = %e, "inbound handshake failed"), } @@ -151,8 +180,12 @@ impl Transport { pub fn try_queue(&self, target: SocketAddr, tag: ChannelTag, data: &[u8]) -> bool { let conn = self.connections.lock().get(&target).map(|p| p.conn.clone()); let Some(conn) = conn else { return false }; + let Ok(permit) = self.queue_permits.clone().try_acquire_owned() else { + return false; + }; let data = data.to_vec(); tokio::spawn(async move { + let _permit = permit; if let Err(e) = write_frame(&conn, tag, &data).await { tracing::debug!(error = %e, "queued write failed"); } @@ -161,6 +194,9 @@ impl Transport { } pub async fn send(&self, target: SocketAddr, tag: ChannelTag, data: &[u8]) { + let Ok(permit) = self.queue_permits.clone().acquire_owned().await else { + return; + }; let existing = self .connections .lock() @@ -184,21 +220,60 @@ impl Transport { } } } + drop(permit); self.connect_and_send(target, tag, data).await; } async fn connect_and_send(&self, target: SocketAddr, tag: ChannelTag, data: &[u8]) { - { - let mut connecting = self.connecting.lock(); - if connecting.contains(&target) { - tracing::warn!(peer = %target, "connection already in-flight, dropping frame"); - return; - } - connecting.insert(target); + enum Role { + Lead(watch::Sender), + Join(watch::Receiver), + } + let role = { + let mut connecting = self.connecting.lock(); + match connecting.get(&target) { + Some(rx) => Role::Join(rx.clone()), + None => { + let (tx, rx) = watch::channel(false); + connecting.insert(target, rx); + Role::Lead(tx) + } + } + }; + match role { + Role::Lead(done) => { + let _guard = ConnectingGuard { + connecting: self.connecting.clone(), + target, + }; + let existing = self.connections.lock().get(&target).map(|p| p.conn.clone()); + match existing { + Some(conn) => { + if let Err(e) = write_frame(&conn, tag, data).await { + tracing::debug!(peer = %target, error = %e, "write on freshly established connection failed"); + } + } + None => self.connect_and_send_inner(target, tag, data).await, + } + let _ = done.send(true); + } + Role::Join(mut done) => { + let _ = + tokio::time::timeout(CONNECT_JOIN_TIMEOUT, done.wait_for(|ready| *ready)).await; + let conn = self.connections.lock().get(&target).map(|p| p.conn.clone()); + match conn { + Some(conn) => { + if let Err(e) = write_frame(&conn, tag, data).await { + tracing::debug!(peer = %target, error = %e, "write after joined connect failed"); + } + } + None => { + crate::metrics::record_transport_write_failure(); + tracing::debug!(peer = %target, "connection attempt failed, dropping frame"); + } + } + } } - - self.connect_and_send_inner(target, tag, data).await; - self.connecting.lock().remove(&target); } async fn connect_and_send_inner(&self, target: SocketAddr, tag: ChannelTag, data: &[u8]) { @@ -208,6 +283,7 @@ impl Transport { max = MAX_OUTBOUND_CONNECTIONS, "outbound connection limit reached, dropping" ); + crate::metrics::record_transport_write_failure(); return; }; let shutdown = self.shutdown.clone(); @@ -249,6 +325,8 @@ impl Transport { conn.clone(), target, self.incoming_tx.clone(), + self.inbound_byte_budget.clone(), + self.peer_read_limiter.clone(), reader_cancel.clone(), )); @@ -270,6 +348,7 @@ impl Transport { tracing::debug!(peer = %target, "established outbound connection"); } Err(e) => { + crate::metrics::record_transport_write_failure(); tracing::warn!(peer = %target, error = %e, "failed to connect after retries"); } } @@ -280,6 +359,8 @@ async fn run_conn_reader( conn: Connection, from: SocketAddr, incoming_tx: mpsc::Sender, + byte_budget: Arc, + peer_limiter: PeerReadLimiter, cancel: CancellationToken, ) { loop { @@ -291,6 +372,8 @@ async fn run_conn_reader( recv, from, incoming_tx.clone(), + byte_budget.clone(), + peer_limiter.clone(), )); } Err(_) => break, @@ -301,10 +384,23 @@ async fn run_conn_reader( enum FrameReadError { Oversize, + BudgetExhausted, Stream(String), } -async fn read_frame(recv: RecvStream, from: SocketAddr, incoming_tx: mpsc::Sender) { +async fn read_frame( + recv: RecvStream, + from: SocketAddr, + incoming_tx: mpsc::Sender, + byte_budget: Arc, + peer_limiter: PeerReadLimiter, +) { + let Some(_peer_guard) = peer_limiter.try_acquire(from.ip()) else { + crate::metrics::record_transport_inbound_dropped(); + tracing::debug!(peer = %from, "per-peer inbound read limit reached, dropping frame"); + return; + }; + let read = async { let mut recv = recv; let mut tag_byte = [0u8; 1]; @@ -312,6 +408,10 @@ async fn read_frame(recv: RecvStream, from: SocketAddr, incoming_tx: mpsc::Sende .await .map_err(|e| FrameReadError::Stream(e.to_string()))?; let mut data = Vec::with_capacity(READ_CHUNK_BYTES); + let mut budget = byte_budget + .clone() + .try_acquire_many_owned(0) + .expect("acquiring zero permits always succeeds"); loop { match recv.read_chunk(READ_CHUNK_BYTES, true).await { Ok(Some(chunk)) => { @@ -319,19 +419,29 @@ async fn read_frame(recv: RecvStream, from: SocketAddr, incoming_tx: mpsc::Sende if data.len() + len > MAX_FRAME_SIZE { return Err(FrameReadError::Oversize); } + let permit = byte_budget + .clone() + .try_acquire_many_owned(len as u32) + .map_err(|_| FrameReadError::BudgetExhausted)?; + budget.merge(permit); data.extend_from_slice(&chunk.bytes); } Ok(None) => break, Err(e) => return Err(FrameReadError::Stream(e.to_string())), } } - Ok::<(u8, Vec), FrameReadError>((tag_byte[0], data)) + Ok::<(u8, Vec, OwnedSemaphorePermit), FrameReadError>((tag_byte[0], data, budget)) }; match tokio::time::timeout(READ_TIMEOUT, read).await { - Ok(Ok((tag_byte, data))) => match ChannelTag::from_u8(tag_byte) { + Ok(Ok((tag_byte, data, budget))) => match ChannelTag::from_u8(tag_byte) { Some(tag) => { - let frame = IncomingFrame { from, tag, data }; + let frame = IncomingFrame { + from, + tag, + data, + _budget: budget, + }; if let Err(e) = incoming_tx.try_send(frame) { tracing::warn!(peer = %from, error = %e, "incoming frame channel full, dropping frame"); } @@ -339,8 +449,13 @@ async fn read_frame(recv: RecvStream, from: SocketAddr, incoming_tx: mpsc::Sende None => tracing::debug!(tag = tag_byte, "unknown channel tag, dropping frame"), }, Ok(Err(FrameReadError::Oversize)) => { + crate::metrics::record_transport_inbound_dropped(); tracing::debug!(peer = %from, max = MAX_FRAME_SIZE, "inbound frame exceeds max size, dropping"); } + Ok(Err(FrameReadError::BudgetExhausted)) => { + crate::metrics::record_transport_inbound_dropped(); + tracing::debug!(peer = %from, "inbound byte budget saturated, dropping frame"); + } Ok(Err(FrameReadError::Stream(msg))) => { tracing::debug!(peer = %from, error = %msg, "failed reading uni stream"); } @@ -350,6 +465,53 @@ async fn read_frame(recv: RecvStream, from: SocketAddr, incoming_tx: mpsc::Sende } } +#[derive(Clone)] +struct PeerReadLimiter { + counts: Arc>>, + max_per_peer: usize, +} + +impl PeerReadLimiter { + fn new(max_per_peer: usize) -> Self { + Self { + counts: Arc::new(parking_lot::Mutex::new(HashMap::new())), + max_per_peer, + } + } + + fn try_acquire(&self, peer: IpAddr) -> Option { + let mut counts = self.counts.lock(); + let count = counts.entry(peer).or_insert(0); + match *count >= self.max_per_peer { + true => None, + false => { + *count += 1; + Some(PeerReadGuard { + counts: self.counts.clone(), + peer, + }) + } + } + } +} + +struct PeerReadGuard { + counts: Arc>>, + peer: IpAddr, +} + +impl Drop for PeerReadGuard { + fn drop(&mut self) { + let mut counts = self.counts.lock(); + if let Some(count) = counts.get_mut(&self.peer) { + *count -= 1; + if *count == 0 { + counts.remove(&self.peer); + } + } + } +} + async fn write_frame(conn: &Connection, tag: ChannelTag, data: &[u8]) -> std::io::Result<()> { if data.len() > MAX_FRAME_SIZE { tracing::warn!( @@ -357,11 +519,12 @@ async fn write_frame(conn: &Connection, tag: ChannelTag, data: &[u8]) -> std::io max = MAX_FRAME_SIZE, "refusing to send oversized frame" ); + crate::metrics::record_transport_write_failure(); return Ok(()); } let timed_out = || std::io::Error::new(std::io::ErrorKind::TimedOut, "write timeout"); let deadline = tokio::time::Instant::now() + WRITE_TIMEOUT; - match tokio::time::timeout_at(deadline, conn.open_uni()).await { + let result = match tokio::time::timeout_at(deadline, conn.open_uni()).await { Ok(Ok(mut send)) => { let write = async { send.write_all(&[tag as u8]) @@ -385,7 +548,11 @@ async fn write_frame(conn: &Connection, tag: ChannelTag, data: &[u8]) -> std::io } Ok(Err(e)) => Err(std::io::Error::other(e)), Err(_) => Err(timed_out()), + }; + if result.is_err() { + crate::metrics::record_transport_write_failure(); } + result } fn transport_config() -> TransportConfig { @@ -555,6 +722,132 @@ mod tests { shutdown.cancel(); } + #[tokio::test] + async fn max_size_frame_roundtrips_and_oversize_is_refused() { + let shutdown = CancellationToken::new(); + let (sender, _rx_sender) = + Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone()) + .await + .expect("bind sender"); + let (receiver, mut rx_receiver) = + Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone()) + .await + .expect("bind receiver"); + let target = receiver.local_addr(); + + let big = vec![0xABu8; MAX_FRAME_SIZE]; + sender.send(target, ChannelTag::CrdtSync, &big).await; + let frame = tokio::time::timeout(Duration::from_secs(20), rx_receiver.recv()) + .await + .expect("max-size frame arrives before timeout") + .expect("channel open"); + assert_eq!(frame.data.len(), MAX_FRAME_SIZE); + + let oversize = vec![0u8; MAX_FRAME_SIZE + 1]; + sender.send(target, ChannelTag::CrdtSync, &oversize).await; + let res = tokio::time::timeout(Duration::from_secs(2), rx_receiver.recv()).await; + assert!(res.is_err(), "oversize frame must be refused sender-side"); + + shutdown.cancel(); + } + + #[tokio::test] + async fn stalled_streams_capped_per_peer() { + let shutdown = CancellationToken::new(); + let (receiver, _rx_receiver) = + Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone()) + .await + .expect("bind receiver"); + let target = receiver.local_addr(); + + let client_config = build_client_config().expect("client config"); + let mut endpoint = + Endpoint::client("127.0.0.1:0".parse().unwrap()).expect("client endpoint"); + endpoint.set_default_client_config(client_config); + let conn = endpoint + .connect(target, RIPPLE_SERVER_NAME) + .expect("connect") + .await + .expect("handshake"); + + let mut sends = Vec::new(); + for _ in 0..(MAX_READS_PER_PEER + 16) { + let mut s = conn.open_uni().await.expect("open uni"); + s.write_all(&[ChannelTag::Gossip as u8]) + .await + .expect("write tag byte"); + sends.push(s); + } + + let peer_ip: IpAddr = "127.0.0.1".parse().unwrap(); + let held = || { + receiver + .peer_read_limiter + .counts + .lock() + .get(&peer_ip) + .copied() + .unwrap_or(0) + }; + let mut capped = false; + for _ in 0..100 { + if held() == MAX_READS_PER_PEER { + capped = true; + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + assert!( + capped, + "stalled streams from one peer must be capped at MAX_READS_PER_PEER; held={}", + held() + ); + + drop(sends); + endpoint.close(0u32.into(), b"done"); + shutdown.cancel(); + } + + #[tokio::test] + async fn inbound_byte_budget_held_until_consumed() { + let shutdown = CancellationToken::new(); + let (sender, _rx_sender) = + Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone()) + .await + .expect("bind sender"); + let (receiver, _rx_receiver) = + Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone()) + .await + .expect("bind receiver"); + let target = receiver.local_addr(); + + assert_eq!( + receiver.inbound_byte_budget.available_permits(), + INBOUND_BYTE_BUDGET + ); + + let payload = vec![7u8; 1024 * 1024]; + sender.send(target, ChannelTag::CrdtSync, &payload).await; + + let mut held = false; + for _ in 0..100 { + if receiver.inbound_byte_budget.available_permits() + <= INBOUND_BYTE_BUDGET - payload.len() + { + held = true; + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + assert!( + held, + "an undrained inbound frame must hold its byte budget; available={}", + receiver.inbound_byte_budget.available_permits() + ); + + shutdown.cancel(); + } + #[tokio::test] async fn incoming_frame_from_matches_peer_listen_addr() { let shutdown = CancellationToken::new(); @@ -583,4 +876,177 @@ mod tests { shutdown.cancel(); } + + #[tokio::test] + async fn inbound_budget_exhaustion_drops_excess_frames() { + use futures::StreamExt; + + let shutdown = CancellationToken::new(); + let (sender, _rx_sender) = + Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone()) + .await + .expect("bind sender"); + let (receiver, rx_receiver) = + Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone()) + .await + .expect("bind receiver"); + let target = receiver.local_addr(); + + let frame_count = INBOUND_BYTE_BUDGET / MAX_FRAME_SIZE + 1; + let payload = vec![0x5Au8; MAX_FRAME_SIZE]; + futures::stream::iter(0..frame_count) + .for_each(|_| sender.send(target, ChannelTag::CrdtSync, &payload)) + .await; + + let received: Vec = futures::stream::unfold(rx_receiver, |mut rx| async { + tokio::time::timeout(Duration::from_secs(5), rx.recv()) + .await + .ok() + .flatten() + .map(|frame| (frame, rx)) + }) + .collect() + .await; + + assert!( + !received.is_empty(), + "frames within the budget must be delivered" + ); + assert!( + received.len() < frame_count, + "frames beyond the inbound byte budget must be dropped while earlier frames sit unconsumed; sent={frame_count} received={}", + received.len() + ); + + shutdown.cancel(); + } + + #[tokio::test] + async fn concurrent_sends_to_fresh_peer_all_delivered() { + let shutdown = CancellationToken::new(); + let (sender, _rx_sender) = + Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone()) + .await + .expect("bind sender"); + let (receiver, rx_receiver) = + Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone()) + .await + .expect("bind receiver"); + let target = receiver.local_addr(); + + let payloads: Vec> = (0..8u8).map(|i| vec![i; 16]).collect(); + futures::future::join_all( + payloads + .iter() + .map(|p| sender.send(target, ChannelTag::CrdtSync, p)), + ) + .await; + + use futures::StreamExt; + let received: Vec = futures::stream::unfold(rx_receiver, |mut rx| async { + tokio::time::timeout(Duration::from_secs(2), rx.recv()) + .await + .ok() + .flatten() + .map(|frame| (frame, rx)) + }) + .collect() + .await; + + let mut seen: Vec> = received.into_iter().map(|f| f.data).collect(); + seen.sort(); + assert_eq!( + seen, payloads, + "sends racing a fresh connection must join it and deliver every frame" + ); + + shutdown.cancel(); + } + + #[tokio::test] + async fn timed_out_write_resets_stream_instead_of_truncating() { + let shutdown = CancellationToken::new(); + let (sender, _rx_sender) = + Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone()) + .await + .expect("bind sender"); + + let server_config = build_server_config().expect("server config"); + let stalled_server = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap()) + .expect("bind stalled server"); + let target = stalled_server.local_addr().expect("local addr"); + + let (release_tx, release_rx) = tokio::sync::oneshot::channel::<()>(); + let reader = tokio::spawn(async move { + let incoming = stalled_server.accept().await.expect("incoming"); + let conn = incoming.await.expect("handshake"); + let mut recv = conn.accept_uni().await.expect("accept uni"); + release_rx.await.expect("release signal"); + recv.read_to_end(MAX_FRAME_SIZE * 2).await + }); + + let payload = vec![0xEEu8; MAX_FRAME_SIZE]; + let started = tokio::time::Instant::now(); + sender.send(target, ChannelTag::CrdtSync, &payload).await; + assert!( + started.elapsed() >= WRITE_TIMEOUT, + "a frame one byte over the stream receive window must stall until the write timeout" + ); + + release_tx.send(()).expect("server task alive"); + let read = reader.await.expect("server task"); + assert!( + read.is_err(), + "a timed-out partial write must reset the stream, not surface a truncated frame; read {} bytes", + read.map(|d| d.len()).unwrap_or(0) + ); + + shutdown.cancel(); + } + + #[tokio::test] + async fn write_timeout_keeps_connection() { + use futures::StreamExt; + + let shutdown = CancellationToken::new(); + let (sender, _rx_sender) = + Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone()) + .await + .expect("bind sender"); + + let server_config = build_server_config().expect("server config"); + let mute_server = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap()) + .expect("bind mute server"); + let target = mute_server.local_addr().expect("local addr"); + let mute = tokio::spawn(async move { + let incoming = mute_server.accept().await.expect("incoming"); + let conn = incoming.await.expect("handshake"); + std::future::pending::<()>().await; + drop(conn); + }); + + futures::stream::iter(0..u64::from(MAX_CONCURRENT_UNI_STREAMS)) + .for_each(|_| sender.send(target, ChannelTag::Gossip, b"fill stream credit")) + .await; + assert!( + sender.connections.lock().contains_key(&target), + "connection must be established before exhausting stream credit" + ); + + let started = tokio::time::Instant::now(); + sender + .send(target, ChannelTag::Gossip, b"blocked frame") + .await; + assert!( + started.elapsed() >= WRITE_TIMEOUT, + "send with exhausted stream credit must block until the write timeout" + ); + assert!( + sender.connections.lock().contains_key(&target), + "a timed-out write must keep the connection registered" + ); + + mute.abort(); + shutdown.cancel(); + } }