diff --git a/test/cluster/auth_cluster/test_raft_service_levels.py b/test/cluster/auth_cluster/test_raft_service_levels.py index 1906d8d9bc..6ddc359078 100644 --- a/test/cluster/auth_cluster/test_raft_service_levels.py +++ b/test/cluster/auth_cluster/test_raft_service_levels.py @@ -644,3 +644,86 @@ async def test_per_service_level_cql_requests_serving(manager: ManagerClient) -> await manager.api.disable_injection(server.ip_addr, "transport_cql_request_pause") safe_driver_shutdown(cluster_a) safe_driver_shutdown(cluster_default) + + +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_per_service_level_cql_request_latency_histogram(manager: ManagerClient) -> None: + """Test that the per-service-level cql_request_latency_histogram metric + records transport-level request latencies for each service level. + + Pauses a query on a custom service level and verifies that the histogram + is updated only after the response is sent. + """ + server = await manager.server_add(config=auth_config) + cql, _ = await manager.get_ready_cql([server]) + + # Create a service level and a user attached to it. + sl_a = unique_name() + await cql.run_async(f"CREATE SERVICE LEVEL {sl_a}") + await cql.run_async(f"CREATE ROLE user_a WITH PASSWORD = 'pass_a' AND LOGIN = true") + await cql.run_async(f"ATTACH SERVICE LEVEL {sl_a} TO user_a") + + cluster_a = manager.con_gen([server.ip_addr], + manager.port, manager.use_ssl, PlainTextAuthProvider(username='user_a', password='pass_a')) + cluster_a.schema_metadata_enabled = False + cluster_a.token_metadata_enabled = False + session_a = cluster_a.connect() + query_task = None + + def metric_value(metrics, suffix): + value = metrics.get(f"scylla_transport_cql_request_latency_histogram_{suffix}", + {'scheduling_group_name': f'sl:{sl_a}'}) + return 0 if value is None else value + + try: + # Warm up the session before enabling the injection, so the paused query + # is not mixed with connection initialization. + session_a.execute("SELECT key FROM system.local") + + metrics = await manager.metrics.query(server.ip_addr) + baseline_count = metric_value(metrics, "count") + baseline_sum = metric_value(metrics, "sum") + + await manager.api.enable_injection(server.ip_addr, "transport_cql_request_pause", False) + log = await manager.server_open_log(server.server_id) + mark = await log.mark() + query_task = asyncio.ensure_future(asyncio.to_thread(session_a.execute, "SELECT key FROM system.local")) + await log.wait_for("transport_cql_request_pause: waiting for message", from_mark=mark) + + metrics = await manager.metrics.query(server.ip_addr) + paused_count = metric_value(metrics, "count") + + await manager.api.message_injection(server.ip_addr, "transport_cql_request_pause") + await query_task + + assert paused_count == baseline_count, \ + f"Expected no new latency sample while request is paused, got {paused_count - baseline_count}" + + async def latency_sample_recorded(): + metrics = await manager.metrics.query(server.ip_addr) + latency_count = metric_value(metrics, "count") + latency_sum = metric_value(metrics, "sum") + assert latency_count >= baseline_count + 1, \ + f"Expected at least one new latency sample for sl:{sl_a}, got {latency_count - baseline_count}" + assert latency_sum >= baseline_sum, \ + f"Expected latency sum to increase for sl:{sl_a}, before={baseline_sum}, after={latency_sum}" + return True + + await wait_for(latency_sample_recorded, deadline=time.time() + 60) + finally: + # If the test failed while the request was still paused at the injection + # point, unblock it so it doesn't hang forever, and clean up the injection + # to avoid affecting subsequent tests. + if query_task is not None: + if not query_task.done(): + try: + await manager.api.message_injection(server.ip_addr, "transport_cql_request_pause") + except Exception: + logger.warning("Failed to unblock paused CQL request", exc_info=True) + try: + await query_task + except Exception: + logger.warning("Paused CQL request failed while cleaning up", exc_info=True) + await manager.api.disable_injection(server.ip_addr, "transport_cql_request_pause") + safe_driver_shutdown(cluster_a)