transport: add per-service-level cql_requests_serving metric

Add a per-scheduling-group gauge that tracks the number of in-flight CQL
requests for each service level. The existing scylla_transport_requests_serving
metric is a single global per-shard counter; the new metric breaks it down
by scheduling group so operators can see which service level contributes
the most in-flight requests when debugging latency.

The metric is named cql_requests_serving (exposed as
scylla_transport_cql_requests_serving) following the cql_ prefix convention
used by all other per-scheduling-group transport metrics (cql_requests_count,
cql_request_bytes, cql_response_bytes, cql_pending_response_memory). Using
a cql_ prefix avoids Prometheus confusion with the global requests_serving
metric, which lacks the scheduling_group_name label.

The counter is incremented when a request enters process_request() and
decremented in the same 'leave' defer block as the global requests_serving,
ensuring the request is counted as in-flight until the response is sent.
This commit is contained in:
Piotr Smaron
2026-04-17 15:07:14 +02:00
parent 4988077249
commit 218f8adc8f
3 changed files with 130 additions and 1 deletions

View File

@@ -532,3 +532,115 @@ async def test_anonymous_user(manager: ManagerClient) -> None:
return
assert False, f"None of clients use sl:default, rows={rows}"
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_per_service_level_cql_requests_serving(manager: ManagerClient) -> None:
"""Test that the per-service-level cql_requests_serving metric correctly
reflects the number of in-flight CQL requests for each service level.
Uses error injection to pause requests mid-flight, then verifies the gauge
shows exactly the right count per scheduling group. Sends two requests on
a custom service level and one on sl:default (the cassandra superuser) to
verify both per-SL counters independently.
"""
cmdline = ['--logger-log-level', 'debug_error_injection=debug']
server = await manager.server_add(config=auth_config, cmdline=cmdline)
cql, _ = await manager.get_ready_cql([server])
# Create one new service level and one new user attached to it.
# The cassandra superuser already uses sl:default.
sl_a = unique_name()
await cql.run_async(f"CREATE SERVICE LEVEL {sl_a}")
await cql.run_async(f"CREATE ROLE user_a WITH PASSWORD = 'pass_a' AND LOGIN = true")
await cql.run_async(f"ATTACH SERVICE LEVEL {sl_a} TO user_a")
# Open dedicated driver sessions for user_a and the cassandra superuser.
# Disable schema and token metadata refresh on both to prevent background
# driver queries from interfering with the error injection and metrics.
cluster_a = manager.con_gen([server.ip_addr],
manager.port, manager.use_ssl, PlainTextAuthProvider(username='user_a', password='pass_a'))
cluster_a.schema_metadata_enabled = False
cluster_a.token_metadata_enabled = False
session_a = cluster_a.connect()
cluster_default = manager.con_gen([server.ip_addr],
manager.port, manager.use_ssl, PlainTextAuthProvider(username='cassandra', password='cassandra'))
cluster_default.schema_metadata_enabled = False
cluster_default.token_metadata_enabled = False
session_default = cluster_default.connect()
try:
# Warm up both sessions to ensure the driver has established connections
# before enabling injection. Without this, execute() may fail with
# NoHostAvailable and the query never reaches the server.
session_a.execute("SELECT key FROM system.local")
session_default.execute("SELECT key FROM system.local")
# Enable error injection to pause CQL requests.
await manager.api.enable_injection(server.ip_addr, "transport_cql_request_pause", False)
log = await manager.server_open_log(server.server_id)
# Send two requests from user_a (sl:sl_a) and one from the cassandra
# superuser (sl:default). Each pauses at the injection point, keeping
# the cql_requests_serving gauge incremented.
mark = await log.mark()
task_a1 = asyncio.ensure_future(asyncio.to_thread(session_a.execute, "SELECT * FROM system.local"))
await log.wait_for("transport_cql_request_pause: waiting for message", from_mark=mark)
mark = await log.mark()
task_a2 = asyncio.ensure_future(asyncio.to_thread(session_a.execute, "SELECT * FROM system.local"))
await log.wait_for("transport_cql_request_pause: waiting for message", from_mark=mark)
mark = await log.mark()
task_default = asyncio.ensure_future(asyncio.to_thread(session_default.execute, "SELECT * FROM system.local"))
await log.wait_for("transport_cql_request_pause: waiting for message", from_mark=mark)
# All three requests are now paused. Verify the per-SL gauges.
# Use >= because a stray request from the manager's CQL session
# (whose metadata refresh we cannot disable) could also get paused,
# inflating the sl:default counter.
metrics = await manager.metrics.query(server.ip_addr)
serving_a = metrics.get("scylla_transport_cql_requests_serving",
{'scheduling_group_name': f'sl:{sl_a}'})
serving_default = metrics.get("scylla_transport_cql_requests_serving",
{'scheduling_group_name': 'sl:default'})
assert serving_a >= 2, \
f"Expected at least 2 in-flight requests for sl:{sl_a}, got {serving_a}"
assert serving_default >= 1, \
f"Expected at least 1 in-flight request for sl:default, got {serving_default}"
# Release paused requests by sending messages. Each message_injection
# call sends one message per shard, waking one handler per shard.
# Send enough to cover our three requests plus any stray ones.
total_paused = int(serving_a + serving_default)
for _ in range(total_paused):
await manager.api.message_injection(server.ip_addr, "transport_cql_request_pause")
# Wait for all requests to complete.
await task_a1
await task_a2
await task_default
# Disable injection before checking metrics to avoid pausing any
# stray requests that might arrive.
await manager.api.disable_injection(server.ip_addr, "transport_cql_request_pause")
# Verify gauges are back to zero. Use a retry loop because a stray
# request from the manager's CQL session may be briefly in-flight.
async def metrics_are_zero():
metrics = await manager.metrics.query(server.ip_addr)
serving_a = metrics.get("scylla_transport_cql_requests_serving",
{'scheduling_group_name': f'sl:{sl_a}'})
serving_default = metrics.get("scylla_transport_cql_requests_serving",
{'scheduling_group_name': 'sl:default'})
assert serving_a == 0, \
f"Expected 0 in-flight requests for sl:{sl_a} after release, got {serving_a}"
assert serving_default == 0, \
f"Expected 0 in-flight requests for sl:default after release, got {serving_default}"
return True
await wait_for(metrics_are_zero, deadline=time.time() + 60)
finally:
await manager.api.disable_injection(server.ip_addr, "transport_cql_request_pause")
safe_driver_shutdown(cluster_a)
safe_driver_shutdown(cluster_default)

View File

@@ -73,6 +73,7 @@
#include "utils/result.hh"
#include "utils/reusable_buffer.hh"
#include "utils/histogram_metrics_helper.hh"
#include "utils/error_injection.hh"
template<typename T = void>
using coordinator_result = exceptions::coordinator_result<T>;
@@ -278,6 +279,12 @@ void cql_sg_stats::register_metrics()
{{"scheduling_group_name", cur_sg_name}}).set_skip_when_empty()
);
transport_metrics.emplace_back(
sm::make_gauge("cql_requests_serving", [this] { return _requests_serving; },
sm::description("Holds the number of requests that are being processed right now."),
{{"scheduling_group_name", cur_sg_name}})
);
new_metrics.add_group("transport", std::move(transport_metrics));
_metrics = std::exchange(new_metrics, {});
}
@@ -1235,10 +1242,13 @@ future<> cql_server::connection::process_request() {
++_server._stats.requests_served;
++_server._stats.requests_serving;
++_server.get_cql_sg_stats()._requests_serving;
_pending_requests_gate.enter();
auto leave = defer([this] {
auto& sg_stats = _server.get_cql_sg_stats();
auto leave = defer([this, &sg_stats] {
--_server._stats.requests_serving;
--sg_stats._requests_serving;
_shedding_timer.cancel();
_shed_incoming_requests = false;
_pending_requests_gate.leave();
@@ -1844,6 +1854,8 @@ cql_server::process(uint16_t stream, request_reader in, service::client_state& c
}
} (opcode);
co_await utils::get_local_injector().inject("transport_cql_request_pause", utils::wait_for_message(60s));
bool init_trace = (bool)!bounced; // If the request was bounced, we already started the trace in the handler
auto msg = co_await coroutine::try_future(process_fn(client_state, _query_processor, in, stream,
version, permit, trace_state, init_trace, {}, dialect));

View File

@@ -148,6 +148,11 @@ struct cql_sg_stats {
// Track total memory consumed by responses waiting to be sent.
// Incremented when a response is queued, decremented when the write completes.
int64_t _pending_response_memory = 0;
// Track the number of CQL requests currently being processed in this
// scheduling group. Incremented when a request starts processing,
// decremented after the response is sent.
uint32_t _requests_serving = 0;
private:
bool _use_metrics = false;
seastar::metrics::metric_groups _metrics;