mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-20 00:20:47 +00:00
Compare commits
3 Commits
master
...
per_sl_cou
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d193740ee5 | ||
|
|
70e114922f | ||
|
|
0c143afea9 |
@@ -44,6 +44,8 @@ The following metrics are new in ScyllaDB |NEW_VERSION| compared to |PRECEDING_V
|
||||
- Number of failed tablet auto repair attempts.
|
||||
* - scylla_tablet_ops_succeeded
|
||||
- Number of successful tablet auto repair attempts.
|
||||
* - scylla_transport_cql_requests_serving
|
||||
- Holds the number of CQL requests that are being processed right now, per service level (scheduling group).
|
||||
|
||||
Renamed Metrics in |NEW_VERSION|
|
||||
--------------------------------------
|
||||
|
||||
@@ -532,3 +532,109 @@ async def test_anonymous_user(manager: ManagerClient) -> None:
|
||||
return
|
||||
|
||||
assert False, f"None of clients use sl:default, rows={rows}"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_per_service_level_cql_requests_serving(manager: ManagerClient) -> None:
|
||||
"""Test that the per-service-level cql_requests_serving metric correctly
|
||||
reflects the number of in-flight CQL requests for each service level.
|
||||
|
||||
Uses error injection to pause requests mid-flight, then verifies the gauge
|
||||
shows exactly the right count per scheduling group. Sends two requests on
|
||||
a custom service level and one on sl:default (the cassandra superuser) to
|
||||
verify both per-SL counters independently.
|
||||
"""
|
||||
cmdline = ['--logger-log-level', 'debug_error_injection=debug']
|
||||
server = await manager.server_add(config=auth_config, cmdline=cmdline)
|
||||
cql, _ = await manager.get_ready_cql([server])
|
||||
|
||||
# Create one new service level and one new user attached to it.
|
||||
# The cassandra superuser already uses sl:default.
|
||||
sl_a = unique_name()
|
||||
await cql.run_async(f"CREATE SERVICE LEVEL {sl_a}")
|
||||
await cql.run_async(f"CREATE ROLE user_a WITH PASSWORD = 'pass_a' AND LOGIN = true")
|
||||
await cql.run_async(f"ATTACH SERVICE LEVEL {sl_a} TO user_a")
|
||||
|
||||
# Open dedicated driver sessions for user_a and the cassandra superuser.
|
||||
# Disable schema and token metadata refresh on both to prevent background
|
||||
# driver queries from interfering with the error injection and metrics.
|
||||
cluster_a = manager.con_gen([server.ip_addr],
|
||||
manager.port, manager.use_ssl, PlainTextAuthProvider(username='user_a', password='pass_a'))
|
||||
cluster_a.schema_metadata_enabled = False
|
||||
cluster_a.token_metadata_enabled = False
|
||||
session_a = cluster_a.connect()
|
||||
|
||||
cluster_default = manager.con_gen([server.ip_addr],
|
||||
manager.port, manager.use_ssl, PlainTextAuthProvider(username='cassandra', password='cassandra'))
|
||||
cluster_default.schema_metadata_enabled = False
|
||||
cluster_default.token_metadata_enabled = False
|
||||
session_default = cluster_default.connect()
|
||||
|
||||
try:
|
||||
# Enable error injection to pause CQL requests.
|
||||
await manager.api.enable_injection(server.ip_addr, "transport_cql_request_pause", False)
|
||||
log = await manager.server_open_log(server.server_id)
|
||||
|
||||
# Send two requests from user_a (sl:sl_a) and one from the cassandra
|
||||
# superuser (sl:default). Each pauses at the injection point, keeping
|
||||
# the cql_requests_serving gauge incremented.
|
||||
mark = await log.mark()
|
||||
task_a1 = asyncio.ensure_future(asyncio.to_thread(session_a.execute, "SELECT * FROM system.local"))
|
||||
await log.wait_for("transport_cql_request_pause: waiting for message", from_mark=mark)
|
||||
|
||||
mark = await log.mark()
|
||||
task_a2 = asyncio.ensure_future(asyncio.to_thread(session_a.execute, "SELECT * FROM system.local"))
|
||||
await log.wait_for("transport_cql_request_pause: waiting for message", from_mark=mark)
|
||||
|
||||
mark = await log.mark()
|
||||
task_default = asyncio.ensure_future(asyncio.to_thread(session_default.execute, "SELECT * FROM system.local"))
|
||||
await log.wait_for("transport_cql_request_pause: waiting for message", from_mark=mark)
|
||||
|
||||
# All three requests are now paused. Verify the per-SL gauges.
|
||||
# Use >= because a stray request from the manager's CQL session
|
||||
# (whose metadata refresh we cannot disable) could also get paused,
|
||||
# inflating the sl:default counter.
|
||||
metrics = await manager.metrics.query(server.ip_addr)
|
||||
serving_a = metrics.get("scylla_transport_cql_requests_serving",
|
||||
{'scheduling_group_name': f'sl:{sl_a}'})
|
||||
serving_default = metrics.get("scylla_transport_cql_requests_serving",
|
||||
{'scheduling_group_name': 'sl:default'})
|
||||
assert serving_a >= 2, \
|
||||
f"Expected at least 2 in-flight requests for sl:{sl_a}, got {serving_a}"
|
||||
assert serving_default >= 1, \
|
||||
f"Expected at least 1 in-flight request for sl:default, got {serving_default}"
|
||||
|
||||
# Release paused requests by sending messages. Each message_injection
|
||||
# call sends one message per shard, waking one handler per shard.
|
||||
# Send enough to cover our three requests plus any stray ones.
|
||||
total_paused = serving_a + serving_default
|
||||
for _ in range(total_paused):
|
||||
await manager.api.message_injection(server.ip_addr, "transport_cql_request_pause")
|
||||
|
||||
# Wait for all requests to complete.
|
||||
await task_a1
|
||||
await task_a2
|
||||
await task_default
|
||||
|
||||
# Disable injection before checking metrics to avoid pausing any
|
||||
# stray requests that might arrive.
|
||||
await manager.api.disable_injection(server.ip_addr, "transport_cql_request_pause")
|
||||
|
||||
# Verify gauges are back to zero. Use a retry loop because a stray
|
||||
# request from the manager's CQL session may be briefly in-flight.
|
||||
async def metrics_are_zero():
|
||||
metrics = await manager.metrics.query(server.ip_addr)
|
||||
serving_a = metrics.get("scylla_transport_cql_requests_serving",
|
||||
{'scheduling_group_name': f'sl:{sl_a}'})
|
||||
serving_default = metrics.get("scylla_transport_cql_requests_serving",
|
||||
{'scheduling_group_name': 'sl:default'})
|
||||
assert serving_a == 0, \
|
||||
f"Expected 0 in-flight requests for sl:{sl_a} after release, got {serving_a}"
|
||||
assert serving_default == 0, \
|
||||
f"Expected 0 in-flight requests for sl:default after release, got {serving_default}"
|
||||
return True
|
||||
await wait_for(metrics_are_zero, deadline=time.time() + 60)
|
||||
finally:
|
||||
await manager.api.disable_injection(server.ip_addr, "transport_cql_request_pause")
|
||||
safe_driver_shutdown(cluster_a)
|
||||
safe_driver_shutdown(cluster_default)
|
||||
|
||||
@@ -73,6 +73,7 @@
|
||||
#include "utils/result.hh"
|
||||
#include "utils/reusable_buffer.hh"
|
||||
#include "utils/histogram_metrics_helper.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
template<typename T = void>
|
||||
using coordinator_result = exceptions::coordinator_result<T>;
|
||||
@@ -278,6 +279,12 @@ void cql_sg_stats::register_metrics()
|
||||
{{"scheduling_group_name", cur_sg_name}}).set_skip_when_empty()
|
||||
);
|
||||
|
||||
transport_metrics.emplace_back(
|
||||
sm::make_gauge("cql_requests_serving", [this] { return _requests_serving; },
|
||||
sm::description("Holds the number of requests that are being processed right now."),
|
||||
{{"scheduling_group_name", cur_sg_name}})
|
||||
);
|
||||
|
||||
new_metrics.add_group("transport", std::move(transport_metrics));
|
||||
_metrics = std::exchange(new_metrics, {});
|
||||
}
|
||||
@@ -1002,6 +1009,7 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
||||
tracing::stop_foreground(trace_state);
|
||||
});
|
||||
--_server._stats.requests_serving;
|
||||
--_server.get_cql_sg_stats()._requests_serving;
|
||||
|
||||
return seastar::futurize_invoke([&] () {
|
||||
if (f.failed()) {
|
||||
@@ -1237,6 +1245,7 @@ future<> cql_server::connection::process_request() {
|
||||
|
||||
++_server._stats.requests_served;
|
||||
++_server._stats.requests_serving;
|
||||
++_server.get_cql_sg_stats()._requests_serving;
|
||||
|
||||
_pending_requests_gate.enter();
|
||||
auto leave = defer([this] {
|
||||
@@ -1845,6 +1854,8 @@ cql_server::process(uint16_t stream, request_reader in, service::client_state& c
|
||||
}
|
||||
} (opcode);
|
||||
|
||||
co_await utils::get_local_injector().inject("transport_cql_request_pause", utils::wait_for_message(60s));
|
||||
|
||||
bool init_trace = (bool)!bounced; // If the request was bounced, we already started the trace in the handler
|
||||
auto msg = co_await coroutine::try_future(process_fn(client_state, _query_processor, in, stream,
|
||||
version, permit, trace_state, init_trace, {}, dialect));
|
||||
|
||||
@@ -148,6 +148,11 @@ struct cql_sg_stats {
|
||||
// Track total memory consumed by responses waiting to be sent.
|
||||
// Incremented when a response is queued, decremented when the write completes.
|
||||
int64_t _pending_response_memory = 0;
|
||||
|
||||
// Track the number of CQL requests currently being processed in this
|
||||
// scheduling group. Incremented when a request starts processing,
|
||||
// decremented when request processing completes.
|
||||
int32_t _requests_serving = 0;
|
||||
private:
|
||||
bool _use_metrics = false;
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
|
||||
Reference in New Issue
Block a user