ripple: transport backpressure & connect coalesing

Lewis: May this revision serve well! <lu5a@proton.me>
This commit is contained in:
Lewis
2026-06-10 09:23:22 +03:00
committed by Tangled
parent 06fd6a1ce9
commit 562f970bc3
2 changed files with 502 additions and 20 deletions
+16
View File
@@ -50,6 +50,14 @@ pub fn describe_metrics() {
"tranquil_ripple_gossip_delta_bytes",
"Size of CRDT delta chunks in bytes"
);
metrics::describe_counter!(
"tranquil_ripple_transport_write_failures_total",
"Total outbound frame writes that failed or timed out"
);
metrics::describe_counter!(
"tranquil_ripple_transport_inbound_dropped_total",
"Total inbound frames dropped because the buffer budget was saturated"
);
}
pub fn record_cache_hit() {
@@ -103,3 +111,11 @@ pub fn record_gossip_drop() {
pub fn record_gossip_delta_bytes(bytes: usize) {
histogram!("tranquil_ripple_gossip_delta_bytes").record(bytes as f64);
}
pub fn record_transport_write_failure() {
counter!("tranquil_ripple_transport_write_failures_total").increment(1);
}
pub fn record_transport_inbound_dropped() {
counter!("tranquil_ripple_transport_inbound_dropped_total").increment(1);
}
+486 -20
View File
@@ -6,17 +6,20 @@ use quinn::{
};
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use tokio::sync::{Semaphore, mpsc};
use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, watch};
use tokio_util::sync::CancellationToken;
pub(crate) const MAX_FRAME_SIZE: usize = 4 * 1024 * 1024;
const MAX_INBOUND_CONNECTIONS: usize = 512;
const MAX_OUTBOUND_CONNECTIONS: usize = 512;
const MAX_QUEUED_WRITES: usize = 1024;
const MAX_CONCURRENT_UNI_STREAMS: u32 = 64;
const INBOUND_BYTE_BUDGET: usize = 128 * 1024 * 1024;
const MAX_READS_PER_PEER: usize = 32;
const READ_CHUNK_BYTES: usize = 256 * 1024;
const INCOMING_CHANNEL_DEPTH: usize = 1024;
const STREAM_RECEIVE_WINDOW: u32 = MAX_FRAME_SIZE as u32;
@@ -24,6 +27,7 @@ const CONNECTION_RECEIVE_WINDOW: u32 = 16 * 1024 * 1024;
const KEEPALIVE: Duration = Duration::from_secs(20);
const IDLE_TIMEOUT: Duration = Duration::from_secs(60);
const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
const CONNECT_JOIN_TIMEOUT: Duration = Duration::from_secs(30);
const WRITE_TIMEOUT: Duration = Duration::from_secs(10);
const READ_TIMEOUT: Duration = Duration::from_secs(30);
const RIPPLE_ALPN: &[u8] = b"ripple/1";
@@ -56,6 +60,7 @@ pub struct IncomingFrame {
pub from: SocketAddr,
pub tag: ChannelTag,
pub data: Vec<u8>,
_budget: OwnedSemaphorePermit,
}
struct PeerConn {
@@ -63,13 +68,29 @@ struct PeerConn {
generation: u64,
}
type ConnectingMap = Arc<parking_lot::Mutex<HashMap<SocketAddr, watch::Receiver<bool>>>>;
struct ConnectingGuard {
connecting: ConnectingMap,
target: SocketAddr,
}
impl Drop for ConnectingGuard {
fn drop(&mut self) {
self.connecting.lock().remove(&self.target);
}
}
pub struct Transport {
endpoint: Endpoint,
local_addr: SocketAddr,
connections: Arc<parking_lot::Mutex<HashMap<SocketAddr, PeerConn>>>,
connecting: Arc<parking_lot::Mutex<std::collections::HashSet<SocketAddr>>>,
connecting: ConnectingMap,
conn_generation: Arc<AtomicU64>,
outbound_permits: Arc<Semaphore>,
queue_permits: Arc<Semaphore>,
inbound_byte_budget: Arc<Semaphore>,
peer_read_limiter: PeerReadLimiter,
shutdown: CancellationToken,
incoming_tx: mpsc::Sender<IncomingFrame>,
}
@@ -88,14 +109,19 @@ impl Transport {
endpoint.set_default_client_config(client_config);
let local_addr = endpoint.local_addr()?;
let (incoming_tx, incoming_rx) = mpsc::channel(INCOMING_CHANNEL_DEPTH);
let inbound_byte_budget = Arc::new(Semaphore::new(INBOUND_BYTE_BUDGET));
let peer_read_limiter = PeerReadLimiter::new(MAX_READS_PER_PEER);
let transport = Self {
endpoint: endpoint.clone(),
local_addr,
connections: Arc::new(parking_lot::Mutex::new(HashMap::new())),
connecting: Arc::new(parking_lot::Mutex::new(std::collections::HashSet::new())),
connecting: Arc::new(parking_lot::Mutex::new(HashMap::new())),
conn_generation: Arc::new(AtomicU64::new(0)),
outbound_permits: Arc::new(Semaphore::new(MAX_OUTBOUND_CONNECTIONS)),
queue_permits: Arc::new(Semaphore::new(MAX_QUEUED_WRITES)),
inbound_byte_budget: inbound_byte_budget.clone(),
peer_read_limiter: peer_read_limiter.clone(),
shutdown: shutdown.clone(),
incoming_tx: incoming_tx.clone(),
};
@@ -122,6 +148,8 @@ impl Transport {
continue;
};
let tx = incoming_tx.clone();
let byte_budget = inbound_byte_budget.clone();
let limiter = peer_read_limiter.clone();
let conn_cancel = cancel.child_token();
tokio::spawn(async move {
let _permit = permit;
@@ -129,7 +157,8 @@ impl Transport {
Ok(conn) => {
let from = conn.remote_address();
tracing::debug!(peer = %from, "accepted inbound connection");
run_conn_reader(conn, from, tx, conn_cancel).await;
run_conn_reader(conn, from, tx, byte_budget, limiter, conn_cancel)
.await;
}
Err(e) => tracing::warn!(error = %e, "inbound handshake failed"),
}
@@ -151,8 +180,12 @@ impl Transport {
pub fn try_queue(&self, target: SocketAddr, tag: ChannelTag, data: &[u8]) -> bool {
let conn = self.connections.lock().get(&target).map(|p| p.conn.clone());
let Some(conn) = conn else { return false };
let Ok(permit) = self.queue_permits.clone().try_acquire_owned() else {
return false;
};
let data = data.to_vec();
tokio::spawn(async move {
let _permit = permit;
if let Err(e) = write_frame(&conn, tag, &data).await {
tracing::debug!(error = %e, "queued write failed");
}
@@ -161,6 +194,9 @@ impl Transport {
}
pub async fn send(&self, target: SocketAddr, tag: ChannelTag, data: &[u8]) {
let Ok(permit) = self.queue_permits.clone().acquire_owned().await else {
return;
};
let existing = self
.connections
.lock()
@@ -184,21 +220,60 @@ impl Transport {
}
}
}
drop(permit);
self.connect_and_send(target, tag, data).await;
}
async fn connect_and_send(&self, target: SocketAddr, tag: ChannelTag, data: &[u8]) {
{
let mut connecting = self.connecting.lock();
if connecting.contains(&target) {
tracing::warn!(peer = %target, "connection already in-flight, dropping frame");
return;
}
connecting.insert(target);
enum Role {
Lead(watch::Sender<bool>),
Join(watch::Receiver<bool>),
}
let role = {
let mut connecting = self.connecting.lock();
match connecting.get(&target) {
Some(rx) => Role::Join(rx.clone()),
None => {
let (tx, rx) = watch::channel(false);
connecting.insert(target, rx);
Role::Lead(tx)
}
}
};
match role {
Role::Lead(done) => {
let _guard = ConnectingGuard {
connecting: self.connecting.clone(),
target,
};
let existing = self.connections.lock().get(&target).map(|p| p.conn.clone());
match existing {
Some(conn) => {
if let Err(e) = write_frame(&conn, tag, data).await {
tracing::debug!(peer = %target, error = %e, "write on freshly established connection failed");
}
}
None => self.connect_and_send_inner(target, tag, data).await,
}
let _ = done.send(true);
}
Role::Join(mut done) => {
let _ =
tokio::time::timeout(CONNECT_JOIN_TIMEOUT, done.wait_for(|ready| *ready)).await;
let conn = self.connections.lock().get(&target).map(|p| p.conn.clone());
match conn {
Some(conn) => {
if let Err(e) = write_frame(&conn, tag, data).await {
tracing::debug!(peer = %target, error = %e, "write after joined connect failed");
}
}
None => {
crate::metrics::record_transport_write_failure();
tracing::debug!(peer = %target, "connection attempt failed, dropping frame");
}
}
}
}
self.connect_and_send_inner(target, tag, data).await;
self.connecting.lock().remove(&target);
}
async fn connect_and_send_inner(&self, target: SocketAddr, tag: ChannelTag, data: &[u8]) {
@@ -208,6 +283,7 @@ impl Transport {
max = MAX_OUTBOUND_CONNECTIONS,
"outbound connection limit reached, dropping"
);
crate::metrics::record_transport_write_failure();
return;
};
let shutdown = self.shutdown.clone();
@@ -249,6 +325,8 @@ impl Transport {
conn.clone(),
target,
self.incoming_tx.clone(),
self.inbound_byte_budget.clone(),
self.peer_read_limiter.clone(),
reader_cancel.clone(),
));
@@ -270,6 +348,7 @@ impl Transport {
tracing::debug!(peer = %target, "established outbound connection");
}
Err(e) => {
crate::metrics::record_transport_write_failure();
tracing::warn!(peer = %target, error = %e, "failed to connect after retries");
}
}
@@ -280,6 +359,8 @@ async fn run_conn_reader(
conn: Connection,
from: SocketAddr,
incoming_tx: mpsc::Sender<IncomingFrame>,
byte_budget: Arc<Semaphore>,
peer_limiter: PeerReadLimiter,
cancel: CancellationToken,
) {
loop {
@@ -291,6 +372,8 @@ async fn run_conn_reader(
recv,
from,
incoming_tx.clone(),
byte_budget.clone(),
peer_limiter.clone(),
));
}
Err(_) => break,
@@ -301,10 +384,23 @@ async fn run_conn_reader(
enum FrameReadError {
Oversize,
BudgetExhausted,
Stream(String),
}
async fn read_frame(recv: RecvStream, from: SocketAddr, incoming_tx: mpsc::Sender<IncomingFrame>) {
async fn read_frame(
recv: RecvStream,
from: SocketAddr,
incoming_tx: mpsc::Sender<IncomingFrame>,
byte_budget: Arc<Semaphore>,
peer_limiter: PeerReadLimiter,
) {
let Some(_peer_guard) = peer_limiter.try_acquire(from.ip()) else {
crate::metrics::record_transport_inbound_dropped();
tracing::debug!(peer = %from, "per-peer inbound read limit reached, dropping frame");
return;
};
let read = async {
let mut recv = recv;
let mut tag_byte = [0u8; 1];
@@ -312,6 +408,10 @@ async fn read_frame(recv: RecvStream, from: SocketAddr, incoming_tx: mpsc::Sende
.await
.map_err(|e| FrameReadError::Stream(e.to_string()))?;
let mut data = Vec::with_capacity(READ_CHUNK_BYTES);
let mut budget = byte_budget
.clone()
.try_acquire_many_owned(0)
.expect("acquiring zero permits always succeeds");
loop {
match recv.read_chunk(READ_CHUNK_BYTES, true).await {
Ok(Some(chunk)) => {
@@ -319,19 +419,29 @@ async fn read_frame(recv: RecvStream, from: SocketAddr, incoming_tx: mpsc::Sende
if data.len() + len > MAX_FRAME_SIZE {
return Err(FrameReadError::Oversize);
}
let permit = byte_budget
.clone()
.try_acquire_many_owned(len as u32)
.map_err(|_| FrameReadError::BudgetExhausted)?;
budget.merge(permit);
data.extend_from_slice(&chunk.bytes);
}
Ok(None) => break,
Err(e) => return Err(FrameReadError::Stream(e.to_string())),
}
}
Ok::<(u8, Vec<u8>), FrameReadError>((tag_byte[0], data))
Ok::<(u8, Vec<u8>, OwnedSemaphorePermit), FrameReadError>((tag_byte[0], data, budget))
};
match tokio::time::timeout(READ_TIMEOUT, read).await {
Ok(Ok((tag_byte, data))) => match ChannelTag::from_u8(tag_byte) {
Ok(Ok((tag_byte, data, budget))) => match ChannelTag::from_u8(tag_byte) {
Some(tag) => {
let frame = IncomingFrame { from, tag, data };
let frame = IncomingFrame {
from,
tag,
data,
_budget: budget,
};
if let Err(e) = incoming_tx.try_send(frame) {
tracing::warn!(peer = %from, error = %e, "incoming frame channel full, dropping frame");
}
@@ -339,8 +449,13 @@ async fn read_frame(recv: RecvStream, from: SocketAddr, incoming_tx: mpsc::Sende
None => tracing::debug!(tag = tag_byte, "unknown channel tag, dropping frame"),
},
Ok(Err(FrameReadError::Oversize)) => {
crate::metrics::record_transport_inbound_dropped();
tracing::debug!(peer = %from, max = MAX_FRAME_SIZE, "inbound frame exceeds max size, dropping");
}
Ok(Err(FrameReadError::BudgetExhausted)) => {
crate::metrics::record_transport_inbound_dropped();
tracing::debug!(peer = %from, "inbound byte budget saturated, dropping frame");
}
Ok(Err(FrameReadError::Stream(msg))) => {
tracing::debug!(peer = %from, error = %msg, "failed reading uni stream");
}
@@ -350,6 +465,53 @@ async fn read_frame(recv: RecvStream, from: SocketAddr, incoming_tx: mpsc::Sende
}
}
#[derive(Clone)]
struct PeerReadLimiter {
counts: Arc<parking_lot::Mutex<HashMap<IpAddr, usize>>>,
max_per_peer: usize,
}
impl PeerReadLimiter {
fn new(max_per_peer: usize) -> Self {
Self {
counts: Arc::new(parking_lot::Mutex::new(HashMap::new())),
max_per_peer,
}
}
fn try_acquire(&self, peer: IpAddr) -> Option<PeerReadGuard> {
let mut counts = self.counts.lock();
let count = counts.entry(peer).or_insert(0);
match *count >= self.max_per_peer {
true => None,
false => {
*count += 1;
Some(PeerReadGuard {
counts: self.counts.clone(),
peer,
})
}
}
}
}
struct PeerReadGuard {
counts: Arc<parking_lot::Mutex<HashMap<IpAddr, usize>>>,
peer: IpAddr,
}
impl Drop for PeerReadGuard {
fn drop(&mut self) {
let mut counts = self.counts.lock();
if let Some(count) = counts.get_mut(&self.peer) {
*count -= 1;
if *count == 0 {
counts.remove(&self.peer);
}
}
}
}
async fn write_frame(conn: &Connection, tag: ChannelTag, data: &[u8]) -> std::io::Result<()> {
if data.len() > MAX_FRAME_SIZE {
tracing::warn!(
@@ -357,11 +519,12 @@ async fn write_frame(conn: &Connection, tag: ChannelTag, data: &[u8]) -> std::io
max = MAX_FRAME_SIZE,
"refusing to send oversized frame"
);
crate::metrics::record_transport_write_failure();
return Ok(());
}
let timed_out = || std::io::Error::new(std::io::ErrorKind::TimedOut, "write timeout");
let deadline = tokio::time::Instant::now() + WRITE_TIMEOUT;
match tokio::time::timeout_at(deadline, conn.open_uni()).await {
let result = match tokio::time::timeout_at(deadline, conn.open_uni()).await {
Ok(Ok(mut send)) => {
let write = async {
send.write_all(&[tag as u8])
@@ -385,7 +548,11 @@ async fn write_frame(conn: &Connection, tag: ChannelTag, data: &[u8]) -> std::io
}
Ok(Err(e)) => Err(std::io::Error::other(e)),
Err(_) => Err(timed_out()),
};
if result.is_err() {
crate::metrics::record_transport_write_failure();
}
result
}
fn transport_config() -> TransportConfig {
@@ -555,6 +722,132 @@ mod tests {
shutdown.cancel();
}
#[tokio::test]
async fn max_size_frame_roundtrips_and_oversize_is_refused() {
let shutdown = CancellationToken::new();
let (sender, _rx_sender) =
Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone())
.await
.expect("bind sender");
let (receiver, mut rx_receiver) =
Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone())
.await
.expect("bind receiver");
let target = receiver.local_addr();
let big = vec![0xABu8; MAX_FRAME_SIZE];
sender.send(target, ChannelTag::CrdtSync, &big).await;
let frame = tokio::time::timeout(Duration::from_secs(20), rx_receiver.recv())
.await
.expect("max-size frame arrives before timeout")
.expect("channel open");
assert_eq!(frame.data.len(), MAX_FRAME_SIZE);
let oversize = vec![0u8; MAX_FRAME_SIZE + 1];
sender.send(target, ChannelTag::CrdtSync, &oversize).await;
let res = tokio::time::timeout(Duration::from_secs(2), rx_receiver.recv()).await;
assert!(res.is_err(), "oversize frame must be refused sender-side");
shutdown.cancel();
}
#[tokio::test]
async fn stalled_streams_capped_per_peer() {
let shutdown = CancellationToken::new();
let (receiver, _rx_receiver) =
Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone())
.await
.expect("bind receiver");
let target = receiver.local_addr();
let client_config = build_client_config().expect("client config");
let mut endpoint =
Endpoint::client("127.0.0.1:0".parse().unwrap()).expect("client endpoint");
endpoint.set_default_client_config(client_config);
let conn = endpoint
.connect(target, RIPPLE_SERVER_NAME)
.expect("connect")
.await
.expect("handshake");
let mut sends = Vec::new();
for _ in 0..(MAX_READS_PER_PEER + 16) {
let mut s = conn.open_uni().await.expect("open uni");
s.write_all(&[ChannelTag::Gossip as u8])
.await
.expect("write tag byte");
sends.push(s);
}
let peer_ip: IpAddr = "127.0.0.1".parse().unwrap();
let held = || {
receiver
.peer_read_limiter
.counts
.lock()
.get(&peer_ip)
.copied()
.unwrap_or(0)
};
let mut capped = false;
for _ in 0..100 {
if held() == MAX_READS_PER_PEER {
capped = true;
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert!(
capped,
"stalled streams from one peer must be capped at MAX_READS_PER_PEER; held={}",
held()
);
drop(sends);
endpoint.close(0u32.into(), b"done");
shutdown.cancel();
}
#[tokio::test]
async fn inbound_byte_budget_held_until_consumed() {
let shutdown = CancellationToken::new();
let (sender, _rx_sender) =
Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone())
.await
.expect("bind sender");
let (receiver, _rx_receiver) =
Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone())
.await
.expect("bind receiver");
let target = receiver.local_addr();
assert_eq!(
receiver.inbound_byte_budget.available_permits(),
INBOUND_BYTE_BUDGET
);
let payload = vec![7u8; 1024 * 1024];
sender.send(target, ChannelTag::CrdtSync, &payload).await;
let mut held = false;
for _ in 0..100 {
if receiver.inbound_byte_budget.available_permits()
<= INBOUND_BYTE_BUDGET - payload.len()
{
held = true;
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert!(
held,
"an undrained inbound frame must hold its byte budget; available={}",
receiver.inbound_byte_budget.available_permits()
);
shutdown.cancel();
}
#[tokio::test]
async fn incoming_frame_from_matches_peer_listen_addr() {
let shutdown = CancellationToken::new();
@@ -583,4 +876,177 @@ mod tests {
shutdown.cancel();
}
#[tokio::test]
async fn inbound_budget_exhaustion_drops_excess_frames() {
use futures::StreamExt;
let shutdown = CancellationToken::new();
let (sender, _rx_sender) =
Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone())
.await
.expect("bind sender");
let (receiver, rx_receiver) =
Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone())
.await
.expect("bind receiver");
let target = receiver.local_addr();
let frame_count = INBOUND_BYTE_BUDGET / MAX_FRAME_SIZE + 1;
let payload = vec![0x5Au8; MAX_FRAME_SIZE];
futures::stream::iter(0..frame_count)
.for_each(|_| sender.send(target, ChannelTag::CrdtSync, &payload))
.await;
let received: Vec<IncomingFrame> = futures::stream::unfold(rx_receiver, |mut rx| async {
tokio::time::timeout(Duration::from_secs(5), rx.recv())
.await
.ok()
.flatten()
.map(|frame| (frame, rx))
})
.collect()
.await;
assert!(
!received.is_empty(),
"frames within the budget must be delivered"
);
assert!(
received.len() < frame_count,
"frames beyond the inbound byte budget must be dropped while earlier frames sit unconsumed; sent={frame_count} received={}",
received.len()
);
shutdown.cancel();
}
#[tokio::test]
async fn concurrent_sends_to_fresh_peer_all_delivered() {
let shutdown = CancellationToken::new();
let (sender, _rx_sender) =
Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone())
.await
.expect("bind sender");
let (receiver, rx_receiver) =
Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone())
.await
.expect("bind receiver");
let target = receiver.local_addr();
let payloads: Vec<Vec<u8>> = (0..8u8).map(|i| vec![i; 16]).collect();
futures::future::join_all(
payloads
.iter()
.map(|p| sender.send(target, ChannelTag::CrdtSync, p)),
)
.await;
use futures::StreamExt;
let received: Vec<IncomingFrame> = futures::stream::unfold(rx_receiver, |mut rx| async {
tokio::time::timeout(Duration::from_secs(2), rx.recv())
.await
.ok()
.flatten()
.map(|frame| (frame, rx))
})
.collect()
.await;
let mut seen: Vec<Vec<u8>> = received.into_iter().map(|f| f.data).collect();
seen.sort();
assert_eq!(
seen, payloads,
"sends racing a fresh connection must join it and deliver every frame"
);
shutdown.cancel();
}
#[tokio::test]
async fn timed_out_write_resets_stream_instead_of_truncating() {
let shutdown = CancellationToken::new();
let (sender, _rx_sender) =
Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone())
.await
.expect("bind sender");
let server_config = build_server_config().expect("server config");
let stalled_server = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap())
.expect("bind stalled server");
let target = stalled_server.local_addr().expect("local addr");
let (release_tx, release_rx) = tokio::sync::oneshot::channel::<()>();
let reader = tokio::spawn(async move {
let incoming = stalled_server.accept().await.expect("incoming");
let conn = incoming.await.expect("handshake");
let mut recv = conn.accept_uni().await.expect("accept uni");
release_rx.await.expect("release signal");
recv.read_to_end(MAX_FRAME_SIZE * 2).await
});
let payload = vec![0xEEu8; MAX_FRAME_SIZE];
let started = tokio::time::Instant::now();
sender.send(target, ChannelTag::CrdtSync, &payload).await;
assert!(
started.elapsed() >= WRITE_TIMEOUT,
"a frame one byte over the stream receive window must stall until the write timeout"
);
release_tx.send(()).expect("server task alive");
let read = reader.await.expect("server task");
assert!(
read.is_err(),
"a timed-out partial write must reset the stream, not surface a truncated frame; read {} bytes",
read.map(|d| d.len()).unwrap_or(0)
);
shutdown.cancel();
}
#[tokio::test]
async fn write_timeout_keeps_connection() {
use futures::StreamExt;
let shutdown = CancellationToken::new();
let (sender, _rx_sender) =
Transport::bind("127.0.0.1:0".parse().unwrap(), shutdown.clone())
.await
.expect("bind sender");
let server_config = build_server_config().expect("server config");
let mute_server = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap())
.expect("bind mute server");
let target = mute_server.local_addr().expect("local addr");
let mute = tokio::spawn(async move {
let incoming = mute_server.accept().await.expect("incoming");
let conn = incoming.await.expect("handshake");
std::future::pending::<()>().await;
drop(conn);
});
futures::stream::iter(0..u64::from(MAX_CONCURRENT_UNI_STREAMS))
.for_each(|_| sender.send(target, ChannelTag::Gossip, b"fill stream credit"))
.await;
assert!(
sender.connections.lock().contains_key(&target),
"connection must be established before exhausting stream credit"
);
let started = tokio::time::Instant::now();
sender
.send(target, ChannelTag::Gossip, b"blocked frame")
.await;
assert!(
started.elapsed() >= WRITE_TIMEOUT,
"send with exhausted stream credit must block until the write timeout"
);
assert!(
sender.connections.lock().contains_key(&target),
"a timed-out write must keep the connection registered"
);
mute.abort();
shutdown.cancel();
}
}