diff --git a/test/cluster/auth_cluster/test_raft_service_levels.py b/test/cluster/auth_cluster/test_raft_service_levels.py index 1906d8d9bc..6ddc359078 100644 --- a/test/cluster/auth_cluster/test_raft_service_levels.py +++ b/test/cluster/auth_cluster/test_raft_service_levels.py @@ -644,3 +644,86 @@ async def test_per_service_level_cql_requests_serving(manager: ManagerClient) -> await manager.api.disable_injection(server.ip_addr, "transport_cql_request_pause") safe_driver_shutdown(cluster_a) safe_driver_shutdown(cluster_default) + + +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_per_service_level_cql_request_latency_histogram(manager: ManagerClient) -> None: + """Test that the per-service-level cql_request_latency_histogram metric + records transport-level request latencies for each service level. + + Pauses a query on a custom service level and verifies that the histogram + is updated only after the response is sent. + """ + server = await manager.server_add(config=auth_config) + cql, _ = await manager.get_ready_cql([server]) + + # Create a service level and a user attached to it. + sl_a = unique_name() + await cql.run_async(f"CREATE SERVICE LEVEL {sl_a}") + await cql.run_async(f"CREATE ROLE user_a WITH PASSWORD = 'pass_a' AND LOGIN = true") + await cql.run_async(f"ATTACH SERVICE LEVEL {sl_a} TO user_a") + + cluster_a = manager.con_gen([server.ip_addr], + manager.port, manager.use_ssl, PlainTextAuthProvider(username='user_a', password='pass_a')) + cluster_a.schema_metadata_enabled = False + cluster_a.token_metadata_enabled = False + session_a = cluster_a.connect() + query_task = None + + def metric_value(metrics, suffix): + value = metrics.get(f"scylla_transport_cql_request_latency_histogram_{suffix}", + {'scheduling_group_name': f'sl:{sl_a}'}) + return 0 if value is None else value + + try: + # Warm up the session before enabling the injection, so the paused query + # is not mixed with connection initialization. + session_a.execute("SELECT key FROM system.local") + + metrics = await manager.metrics.query(server.ip_addr) + baseline_count = metric_value(metrics, "count") + baseline_sum = metric_value(metrics, "sum") + + await manager.api.enable_injection(server.ip_addr, "transport_cql_request_pause", False) + log = await manager.server_open_log(server.server_id) + mark = await log.mark() + query_task = asyncio.ensure_future(asyncio.to_thread(session_a.execute, "SELECT key FROM system.local")) + await log.wait_for("transport_cql_request_pause: waiting for message", from_mark=mark) + + metrics = await manager.metrics.query(server.ip_addr) + paused_count = metric_value(metrics, "count") + + await manager.api.message_injection(server.ip_addr, "transport_cql_request_pause") + await query_task + + assert paused_count == baseline_count, \ + f"Expected no new latency sample while request is paused, got {paused_count - baseline_count}" + + async def latency_sample_recorded(): + metrics = await manager.metrics.query(server.ip_addr) + latency_count = metric_value(metrics, "count") + latency_sum = metric_value(metrics, "sum") + assert latency_count >= baseline_count + 1, \ + f"Expected at least one new latency sample for sl:{sl_a}, got {latency_count - baseline_count}" + assert latency_sum >= baseline_sum, \ + f"Expected latency sum to increase for sl:{sl_a}, before={baseline_sum}, after={latency_sum}" + return True + + await wait_for(latency_sample_recorded, deadline=time.time() + 60) + finally: + # If the test failed while the request was still paused at the injection + # point, unblock it so it doesn't hang forever, and clean up the injection + # to avoid affecting subsequent tests. + if query_task is not None: + if not query_task.done(): + try: + await manager.api.message_injection(server.ip_addr, "transport_cql_request_pause") + except Exception: + logger.warning("Failed to unblock paused CQL request", exc_info=True) + try: + await query_task + except Exception: + logger.warning("Paused CQL request failed while cleaning up", exc_info=True) + await manager.api.disable_injection(server.ip_addr, "transport_cql_request_pause") + safe_driver_shutdown(cluster_a) diff --git a/transport/server.cc b/transport/server.cc index 49e53c8c9e..9c909cfb29 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -285,6 +285,13 @@ void cql_sg_stats::register_metrics() {{"scheduling_group_name", cur_sg_name}}) ); + transport_metrics.emplace_back( + sm::make_histogram("cql_request_latency_histogram", [this] { return to_metrics_histogram(_request_latency); }, + sm::description("A histogram of transport-level CQL request latencies (in microseconds), " + "measuring from the start of request processing until the response is written to the socket."), + {{"scheduling_group_name", cur_sg_name}}).aggregate({seastar::metrics::shard_label}).set_skip_when_empty() + ); + new_metrics.add_group("transport", std::move(transport_metrics)); _metrics = std::exchange(new_metrics, {}); } @@ -1154,6 +1161,7 @@ future<> cql_server::connection::process_request() { } auto& f = *maybe_frame; + auto request_start_time = db::timeout_clock::now(); const bool allow_shedding = _client_state.get_workload_type() == service::client_state::workload_type::interactive; if (allow_shedding && _shed_incoming_requests) { @@ -1231,14 +1239,14 @@ future<> cql_server::connection::process_request() { ++_server._stats.requests_blocked_memory; } - return fut.then_wrapped([this, length = f.length, flags = f.flags, op, stream, tracing_requested] (auto mem_permit_fut) { + return fut.then_wrapped([this, length = f.length, flags = f.flags, op, stream, tracing_requested, request_start_time] (auto mem_permit_fut) { if (mem_permit_fut.failed()) { // Ignore semaphore errors - they are expected if load shedding took place mem_permit_fut.ignore_ready_future(); return make_ready_future<>(); } semaphore_units<> mem_permit = mem_permit_fut.get(); - return this->read_and_decompress_frame(length, flags).then([this, op, stream, tracing_requested, mem_permit = make_service_permit(std::move(mem_permit))] (fragmented_temporary_buffer buf) mutable { + return this->read_and_decompress_frame(length, flags).then([this, op, stream, tracing_requested, mem_permit = make_service_permit(std::move(mem_permit)), request_start_time] (fragmented_temporary_buffer buf) mutable { ++_server._stats.requests_served; ++_server._stats.requests_serving; @@ -1267,7 +1275,7 @@ future<> cql_server::connection::process_request() { _process_request_stage(this, istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit) : process_request_one(istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit); - future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream] (future>> response_f) mutable { + future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream, request_start_time] (future>> response_f) mutable { try { auto& sg_stats = _server.get_cql_sg_stats(); size_t pending_response_size = 0; @@ -1291,8 +1299,9 @@ future<> cql_server::connection::process_request() { sg_stats._pending_response_memory += pending_response_size; write_response(std::move(response), _compression); } - _ready_to_respond = _ready_to_respond.finally([leave = std::move(leave), permit = std::move(mem_permit), &sg_stats, pending_response_size] { + _ready_to_respond = _ready_to_respond.finally([leave = std::move(leave), permit = std::move(mem_permit), &sg_stats, pending_response_size, request_start_time] { sg_stats._pending_response_memory -= pending_response_size; + sg_stats._request_latency.add(db::timeout_clock::now() - request_start_time); }); } catch (...) { clogger.error("{}: request processing failed: {}", diff --git a/transport/server.hh b/transport/server.hh index f522909e3e..7768afca33 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -153,6 +153,12 @@ struct cql_sg_stats { // scheduling group. Incremented when a request starts processing, // decremented after the response is sent. uint32_t _requests_serving = 0; + + // Latency histogram tracking the transport-level request lifetime: + // from the start of request processing until the response is written + // to the socket. This captures processing time and time waiting in the + // response write queue, complementing storage-proxy-level latency. + utils::time_estimated_histogram _request_latency; private: bool _use_metrics = false; seastar::metrics::metric_groups _metrics;