diff --git a/test/cluster/auth_cluster/test_raft_service_levels.py b/test/cluster/auth_cluster/test_raft_service_levels.py index e5e19f9636..1906d8d9bc 100644 --- a/test/cluster/auth_cluster/test_raft_service_levels.py +++ b/test/cluster/auth_cluster/test_raft_service_levels.py @@ -532,3 +532,115 @@ async def test_anonymous_user(manager: ManagerClient) -> None: return assert False, f"None of clients use sl:default, rows={rows}" + +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_per_service_level_cql_requests_serving(manager: ManagerClient) -> None: + """Test that the per-service-level cql_requests_serving metric correctly + reflects the number of in-flight CQL requests for each service level. + + Uses error injection to pause requests mid-flight, then verifies the gauge + shows exactly the right count per scheduling group. Sends two requests on + a custom service level and one on sl:default (the cassandra superuser) to + verify both per-SL counters independently. + """ + cmdline = ['--logger-log-level', 'debug_error_injection=debug'] + server = await manager.server_add(config=auth_config, cmdline=cmdline) + cql, _ = await manager.get_ready_cql([server]) + + # Create one new service level and one new user attached to it. + # The cassandra superuser already uses sl:default. + sl_a = unique_name() + await cql.run_async(f"CREATE SERVICE LEVEL {sl_a}") + await cql.run_async(f"CREATE ROLE user_a WITH PASSWORD = 'pass_a' AND LOGIN = true") + await cql.run_async(f"ATTACH SERVICE LEVEL {sl_a} TO user_a") + + # Open dedicated driver sessions for user_a and the cassandra superuser. + # Disable schema and token metadata refresh on both to prevent background + # driver queries from interfering with the error injection and metrics. + cluster_a = manager.con_gen([server.ip_addr], + manager.port, manager.use_ssl, PlainTextAuthProvider(username='user_a', password='pass_a')) + cluster_a.schema_metadata_enabled = False + cluster_a.token_metadata_enabled = False + session_a = cluster_a.connect() + + cluster_default = manager.con_gen([server.ip_addr], + manager.port, manager.use_ssl, PlainTextAuthProvider(username='cassandra', password='cassandra')) + cluster_default.schema_metadata_enabled = False + cluster_default.token_metadata_enabled = False + session_default = cluster_default.connect() + + try: + # Warm up both sessions to ensure the driver has established connections + # before enabling injection. Without this, execute() may fail with + # NoHostAvailable and the query never reaches the server. + session_a.execute("SELECT key FROM system.local") + session_default.execute("SELECT key FROM system.local") + + # Enable error injection to pause CQL requests. + await manager.api.enable_injection(server.ip_addr, "transport_cql_request_pause", False) + log = await manager.server_open_log(server.server_id) + + # Send two requests from user_a (sl:sl_a) and one from the cassandra + # superuser (sl:default). Each pauses at the injection point, keeping + # the cql_requests_serving gauge incremented. + mark = await log.mark() + task_a1 = asyncio.ensure_future(asyncio.to_thread(session_a.execute, "SELECT * FROM system.local")) + await log.wait_for("transport_cql_request_pause: waiting for message", from_mark=mark) + + mark = await log.mark() + task_a2 = asyncio.ensure_future(asyncio.to_thread(session_a.execute, "SELECT * FROM system.local")) + await log.wait_for("transport_cql_request_pause: waiting for message", from_mark=mark) + + mark = await log.mark() + task_default = asyncio.ensure_future(asyncio.to_thread(session_default.execute, "SELECT * FROM system.local")) + await log.wait_for("transport_cql_request_pause: waiting for message", from_mark=mark) + + # All three requests are now paused. Verify the per-SL gauges. + # Use >= because a stray request from the manager's CQL session + # (whose metadata refresh we cannot disable) could also get paused, + # inflating the sl:default counter. + metrics = await manager.metrics.query(server.ip_addr) + serving_a = metrics.get("scylla_transport_cql_requests_serving", + {'scheduling_group_name': f'sl:{sl_a}'}) + serving_default = metrics.get("scylla_transport_cql_requests_serving", + {'scheduling_group_name': 'sl:default'}) + assert serving_a >= 2, \ + f"Expected at least 2 in-flight requests for sl:{sl_a}, got {serving_a}" + assert serving_default >= 1, \ + f"Expected at least 1 in-flight request for sl:default, got {serving_default}" + + # Release paused requests by sending messages. Each message_injection + # call sends one message per shard, waking one handler per shard. + # Send enough to cover our three requests plus any stray ones. + total_paused = int(serving_a + serving_default) + for _ in range(total_paused): + await manager.api.message_injection(server.ip_addr, "transport_cql_request_pause") + + # Wait for all requests to complete. + await task_a1 + await task_a2 + await task_default + + # Disable injection before checking metrics to avoid pausing any + # stray requests that might arrive. + await manager.api.disable_injection(server.ip_addr, "transport_cql_request_pause") + + # Verify gauges are back to zero. Use a retry loop because a stray + # request from the manager's CQL session may be briefly in-flight. + async def metrics_are_zero(): + metrics = await manager.metrics.query(server.ip_addr) + serving_a = metrics.get("scylla_transport_cql_requests_serving", + {'scheduling_group_name': f'sl:{sl_a}'}) + serving_default = metrics.get("scylla_transport_cql_requests_serving", + {'scheduling_group_name': 'sl:default'}) + assert serving_a == 0, \ + f"Expected 0 in-flight requests for sl:{sl_a} after release, got {serving_a}" + assert serving_default == 0, \ + f"Expected 0 in-flight requests for sl:default after release, got {serving_default}" + return True + await wait_for(metrics_are_zero, deadline=time.time() + 60) + finally: + await manager.api.disable_injection(server.ip_addr, "transport_cql_request_pause") + safe_driver_shutdown(cluster_a) + safe_driver_shutdown(cluster_default) diff --git a/transport/server.cc b/transport/server.cc index 8e66a4ed52..264de5fb5a 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -73,6 +73,7 @@ #include "utils/result.hh" #include "utils/reusable_buffer.hh" #include "utils/histogram_metrics_helper.hh" +#include "utils/error_injection.hh" template using coordinator_result = exceptions::coordinator_result; @@ -278,6 +279,12 @@ void cql_sg_stats::register_metrics() {{"scheduling_group_name", cur_sg_name}}).set_skip_when_empty() ); + transport_metrics.emplace_back( + sm::make_gauge("cql_requests_serving", [this] { return _requests_serving; }, + sm::description("Holds the number of requests that are being processed right now."), + {{"scheduling_group_name", cur_sg_name}}) + ); + new_metrics.add_group("transport", std::move(transport_metrics)); _metrics = std::exchange(new_metrics, {}); } @@ -1235,10 +1242,13 @@ future<> cql_server::connection::process_request() { ++_server._stats.requests_served; ++_server._stats.requests_serving; + ++_server.get_cql_sg_stats()._requests_serving; _pending_requests_gate.enter(); - auto leave = defer([this] { + auto& sg_stats = _server.get_cql_sg_stats(); + auto leave = defer([this, &sg_stats] { --_server._stats.requests_serving; + --sg_stats._requests_serving; _shedding_timer.cancel(); _shed_incoming_requests = false; _pending_requests_gate.leave(); @@ -1844,6 +1854,8 @@ cql_server::process(uint16_t stream, request_reader in, service::client_state& c } } (opcode); + co_await utils::get_local_injector().inject("transport_cql_request_pause", utils::wait_for_message(60s)); + bool init_trace = (bool)!bounced; // If the request was bounced, we already started the trace in the handler auto msg = co_await coroutine::try_future(process_fn(client_state, _query_processor, in, stream, version, permit, trace_state, init_trace, {}, dialect)); diff --git a/transport/server.hh b/transport/server.hh index 31223bff57..64342ca04d 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -148,6 +148,11 @@ struct cql_sg_stats { // Track total memory consumed by responses waiting to be sent. // Incremented when a response is queued, decremented when the write completes. int64_t _pending_response_memory = 0; + + // Track the number of CQL requests currently being processed in this + // scheduling group. Incremented when a request starts processing, + // decremented after the response is sent. + uint32_t _requests_serving = 0; private: bool _use_metrics = false; seastar::metrics::metric_groups _metrics;