test/cluster: add test for per-service-level transport request latency histogram

Verify that the new scylla_transport_cql_request_latency_histogram metric
correctly records transport-level request latencies per service level.

Uses error injection to pause a request mid-flight and verifies that the
histogram is not updated while the request is paused (since the response
has not been written yet), and is updated after the request completes.

Co-authored-by: Marcin Maliszkiewicz <marcinmal@scylladb.com>
This commit is contained in:
Piotr Smaron
2026-05-15 12:18:04 +02:00
committed by Marcin Maliszkiewicz
parent f90a3296cf
commit 810ed6eedc

View File

@@ -644,3 +644,86 @@ async def test_per_service_level_cql_requests_serving(manager: ManagerClient) ->
await manager.api.disable_injection(server.ip_addr, "transport_cql_request_pause")
safe_driver_shutdown(cluster_a)
safe_driver_shutdown(cluster_default)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_per_service_level_cql_request_latency_histogram(manager: ManagerClient) -> None:
"""Test that the per-service-level cql_request_latency_histogram metric
records transport-level request latencies for each service level.
Pauses a query on a custom service level and verifies that the histogram
is updated only after the response is sent.
"""
server = await manager.server_add(config=auth_config)
cql, _ = await manager.get_ready_cql([server])
# Create a service level and a user attached to it.
sl_a = unique_name()
await cql.run_async(f"CREATE SERVICE LEVEL {sl_a}")
await cql.run_async(f"CREATE ROLE user_a WITH PASSWORD = 'pass_a' AND LOGIN = true")
await cql.run_async(f"ATTACH SERVICE LEVEL {sl_a} TO user_a")
cluster_a = manager.con_gen([server.ip_addr],
manager.port, manager.use_ssl, PlainTextAuthProvider(username='user_a', password='pass_a'))
cluster_a.schema_metadata_enabled = False
cluster_a.token_metadata_enabled = False
session_a = cluster_a.connect()
query_task = None
def metric_value(metrics, suffix):
value = metrics.get(f"scylla_transport_cql_request_latency_histogram_{suffix}",
{'scheduling_group_name': f'sl:{sl_a}'})
return 0 if value is None else value
try:
# Warm up the session before enabling the injection, so the paused query
# is not mixed with connection initialization.
session_a.execute("SELECT key FROM system.local")
metrics = await manager.metrics.query(server.ip_addr)
baseline_count = metric_value(metrics, "count")
baseline_sum = metric_value(metrics, "sum")
await manager.api.enable_injection(server.ip_addr, "transport_cql_request_pause", False)
log = await manager.server_open_log(server.server_id)
mark = await log.mark()
query_task = asyncio.ensure_future(asyncio.to_thread(session_a.execute, "SELECT key FROM system.local"))
await log.wait_for("transport_cql_request_pause: waiting for message", from_mark=mark)
metrics = await manager.metrics.query(server.ip_addr)
paused_count = metric_value(metrics, "count")
await manager.api.message_injection(server.ip_addr, "transport_cql_request_pause")
await query_task
assert paused_count == baseline_count, \
f"Expected no new latency sample while request is paused, got {paused_count - baseline_count}"
async def latency_sample_recorded():
metrics = await manager.metrics.query(server.ip_addr)
latency_count = metric_value(metrics, "count")
latency_sum = metric_value(metrics, "sum")
assert latency_count >= baseline_count + 1, \
f"Expected at least one new latency sample for sl:{sl_a}, got {latency_count - baseline_count}"
assert latency_sum >= baseline_sum, \
f"Expected latency sum to increase for sl:{sl_a}, before={baseline_sum}, after={latency_sum}"
return True
await wait_for(latency_sample_recorded, deadline=time.time() + 60)
finally:
# If the test failed while the request was still paused at the injection
# point, unblock it so it doesn't hang forever, and clean up the injection
# to avoid affecting subsequent tests.
if query_task is not None:
if not query_task.done():
try:
await manager.api.message_injection(server.ip_addr, "transport_cql_request_pause")
except Exception:
logger.warning("Failed to unblock paused CQL request", exc_info=True)
try:
await query_task
except Exception:
logger.warning("Paused CQL request failed while cleaning up", exc_info=True)
await manager.api.disable_injection(server.ip_addr, "transport_cql_request_pause")
safe_driver_shutdown(cluster_a)