Compare commits

...

3 Commits

Author SHA1 Message Date
Piotr Smaron
d193740ee5 docs: add scylla_transport_cql_requests_serving to new metrics list 2026-04-16 08:26:19 +02:00
Piotr Smaron
70e114922f transport: add per-service-level cql_requests_serving metric
Add a per-scheduling-group gauge that tracks the number of CQL requests
currently being processed. This breaks down the existing global
scylla_transport_requests_serving counter by service level, letting
operators see each service level's contribution to the in-flight
request count when debugging latency issues.

The new metric is named cql_requests_serving (exposed as
scylla_transport_cql_requests_serving) and uses the cql_ prefix to
follow the naming convention of all other per-scheduling-group transport
metrics (cql_requests_count, cql_request_bytes, cql_response_bytes,
cql_pending_response_memory). The cql_ prefix also avoids a Prometheus
naming collision with the global requests_serving metric, which has no
scheduling_group_name label -- using the same base name would cause
sum() queries to double-count.

Fixes: SCYLLADB-1340
2026-04-16 08:26:19 +02:00
Piotr Smaron
0c143afea9 test: add xfail test for per-service-level cql_requests_serving metric
Add a test that verifies the per-service-level cql_requests_serving
metric correctly reflects the number of in-flight CQL requests per
scheduling group. The test:

- Creates one service level and one user attached to it, while using
  the cassandra superuser (sl:default) for the second service level
- Opens dedicated CQL sessions for each user with schema and token
  metadata refresh disabled to prevent background driver queries from
  interfering with the error injection and metrics
- Uses error injection (transport_cql_request_pause) to pause requests
  mid-flight inside cql_server::process()
- Sends two requests on the custom service level and one on sl:default
- Verifies the gauges show at least 2 and 1 in-flight requests (using
  >= to tolerate stray requests from the manager's CQL session)
- Releases all paused requests (including any stray ones) and verifies
  counters return to zero using a retry loop

The test is marked xfail because the metric is not yet implemented.

Also adds the transport_cql_request_pause error injection point in
cql_server::process(), which is needed by this test to hold requests
in-flight while observing the gauge.
2026-04-16 08:26:19 +02:00
4 changed files with 124 additions and 0 deletions

View File

@@ -44,6 +44,8 @@ The following metrics are new in ScyllaDB |NEW_VERSION| compared to |PRECEDING_V
- Number of failed tablet auto repair attempts.
* - scylla_tablet_ops_succeeded
- Number of successful tablet auto repair attempts.
* - scylla_transport_cql_requests_serving
- Holds the number of CQL requests that are being processed right now, per service level (scheduling group).
Renamed Metrics in |NEW_VERSION|
--------------------------------------

View File

@@ -532,3 +532,109 @@ async def test_anonymous_user(manager: ManagerClient) -> None:
return
assert False, f"None of clients use sl:default, rows={rows}"
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_per_service_level_cql_requests_serving(manager: ManagerClient) -> None:
"""Test that the per-service-level cql_requests_serving metric correctly
reflects the number of in-flight CQL requests for each service level.
Uses error injection to pause requests mid-flight, then verifies the gauge
shows exactly the right count per scheduling group. Sends two requests on
a custom service level and one on sl:default (the cassandra superuser) to
verify both per-SL counters independently.
"""
cmdline = ['--logger-log-level', 'debug_error_injection=debug']
server = await manager.server_add(config=auth_config, cmdline=cmdline)
cql, _ = await manager.get_ready_cql([server])
# Create one new service level and one new user attached to it.
# The cassandra superuser already uses sl:default.
sl_a = unique_name()
await cql.run_async(f"CREATE SERVICE LEVEL {sl_a}")
await cql.run_async(f"CREATE ROLE user_a WITH PASSWORD = 'pass_a' AND LOGIN = true")
await cql.run_async(f"ATTACH SERVICE LEVEL {sl_a} TO user_a")
# Open dedicated driver sessions for user_a and the cassandra superuser.
# Disable schema and token metadata refresh on both to prevent background
# driver queries from interfering with the error injection and metrics.
cluster_a = manager.con_gen([server.ip_addr],
manager.port, manager.use_ssl, PlainTextAuthProvider(username='user_a', password='pass_a'))
cluster_a.schema_metadata_enabled = False
cluster_a.token_metadata_enabled = False
session_a = cluster_a.connect()
cluster_default = manager.con_gen([server.ip_addr],
manager.port, manager.use_ssl, PlainTextAuthProvider(username='cassandra', password='cassandra'))
cluster_default.schema_metadata_enabled = False
cluster_default.token_metadata_enabled = False
session_default = cluster_default.connect()
try:
# Enable error injection to pause CQL requests.
await manager.api.enable_injection(server.ip_addr, "transport_cql_request_pause", False)
log = await manager.server_open_log(server.server_id)
# Send two requests from user_a (sl:sl_a) and one from the cassandra
# superuser (sl:default). Each pauses at the injection point, keeping
# the cql_requests_serving gauge incremented.
mark = await log.mark()
task_a1 = asyncio.ensure_future(asyncio.to_thread(session_a.execute, "SELECT * FROM system.local"))
await log.wait_for("transport_cql_request_pause: waiting for message", from_mark=mark)
mark = await log.mark()
task_a2 = asyncio.ensure_future(asyncio.to_thread(session_a.execute, "SELECT * FROM system.local"))
await log.wait_for("transport_cql_request_pause: waiting for message", from_mark=mark)
mark = await log.mark()
task_default = asyncio.ensure_future(asyncio.to_thread(session_default.execute, "SELECT * FROM system.local"))
await log.wait_for("transport_cql_request_pause: waiting for message", from_mark=mark)
# All three requests are now paused. Verify the per-SL gauges.
# Use >= because a stray request from the manager's CQL session
# (whose metadata refresh we cannot disable) could also get paused,
# inflating the sl:default counter.
metrics = await manager.metrics.query(server.ip_addr)
serving_a = metrics.get("scylla_transport_cql_requests_serving",
{'scheduling_group_name': f'sl:{sl_a}'})
serving_default = metrics.get("scylla_transport_cql_requests_serving",
{'scheduling_group_name': 'sl:default'})
assert serving_a >= 2, \
f"Expected at least 2 in-flight requests for sl:{sl_a}, got {serving_a}"
assert serving_default >= 1, \
f"Expected at least 1 in-flight request for sl:default, got {serving_default}"
# Release paused requests by sending messages. Each message_injection
# call sends one message per shard, waking one handler per shard.
# Send enough to cover our three requests plus any stray ones.
total_paused = serving_a + serving_default
for _ in range(total_paused):
await manager.api.message_injection(server.ip_addr, "transport_cql_request_pause")
# Wait for all requests to complete.
await task_a1
await task_a2
await task_default
# Disable injection before checking metrics to avoid pausing any
# stray requests that might arrive.
await manager.api.disable_injection(server.ip_addr, "transport_cql_request_pause")
# Verify gauges are back to zero. Use a retry loop because a stray
# request from the manager's CQL session may be briefly in-flight.
async def metrics_are_zero():
metrics = await manager.metrics.query(server.ip_addr)
serving_a = metrics.get("scylla_transport_cql_requests_serving",
{'scheduling_group_name': f'sl:{sl_a}'})
serving_default = metrics.get("scylla_transport_cql_requests_serving",
{'scheduling_group_name': 'sl:default'})
assert serving_a == 0, \
f"Expected 0 in-flight requests for sl:{sl_a} after release, got {serving_a}"
assert serving_default == 0, \
f"Expected 0 in-flight requests for sl:default after release, got {serving_default}"
return True
await wait_for(metrics_are_zero, deadline=time.time() + 60)
finally:
await manager.api.disable_injection(server.ip_addr, "transport_cql_request_pause")
safe_driver_shutdown(cluster_a)
safe_driver_shutdown(cluster_default)

View File

@@ -73,6 +73,7 @@
#include "utils/result.hh"
#include "utils/reusable_buffer.hh"
#include "utils/histogram_metrics_helper.hh"
#include "utils/error_injection.hh"
template<typename T = void>
using coordinator_result = exceptions::coordinator_result<T>;
@@ -278,6 +279,12 @@ void cql_sg_stats::register_metrics()
{{"scheduling_group_name", cur_sg_name}}).set_skip_when_empty()
);
transport_metrics.emplace_back(
sm::make_gauge("cql_requests_serving", [this] { return _requests_serving; },
sm::description("Holds the number of requests that are being processed right now."),
{{"scheduling_group_name", cur_sg_name}})
);
new_metrics.add_group("transport", std::move(transport_metrics));
_metrics = std::exchange(new_metrics, {});
}
@@ -1002,6 +1009,7 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
tracing::stop_foreground(trace_state);
});
--_server._stats.requests_serving;
--_server.get_cql_sg_stats()._requests_serving;
return seastar::futurize_invoke([&] () {
if (f.failed()) {
@@ -1237,6 +1245,7 @@ future<> cql_server::connection::process_request() {
++_server._stats.requests_served;
++_server._stats.requests_serving;
++_server.get_cql_sg_stats()._requests_serving;
_pending_requests_gate.enter();
auto leave = defer([this] {
@@ -1845,6 +1854,8 @@ cql_server::process(uint16_t stream, request_reader in, service::client_state& c
}
} (opcode);
co_await utils::get_local_injector().inject("transport_cql_request_pause", utils::wait_for_message(60s));
bool init_trace = (bool)!bounced; // If the request was bounced, we already started the trace in the handler
auto msg = co_await coroutine::try_future(process_fn(client_state, _query_processor, in, stream,
version, permit, trace_state, init_trace, {}, dialect));

View File

@@ -148,6 +148,11 @@ struct cql_sg_stats {
// Track total memory consumed by responses waiting to be sent.
// Incremented when a response is queued, decremented when the write completes.
int64_t _pending_response_memory = 0;
// Track the number of CQL requests currently being processed in this
// scheduling group. Incremented when a request starts processing,
// decremented when request processing completes.
int32_t _requests_serving = 0;
private:
bool _use_metrics = false;
seastar::metrics::metric_groups _metrics;