mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-25 09:11:10 +00:00
Merge 'transport: add per-service-level transport request latency histogram' from Piotr Smaron and Marcin Maliszkiewicz
Add a per-scheduling-group latency histogram on the transport level that measures the full CQL request lifetime: from fetching the request buffer until the response is written to the socket. Today latencies are accounted only on the storage proxy level, leaving the time spent in the transport layer (response queue wait + actual I/O) unaccounted. Having both transport and storage proxy latencies allows operators to tell where latency accumulates. The metric is exposed as scylla_transport_cql_request_latency_histogram with the scheduling_group_name label, following the cql_ prefix convention of all other per-SG transport metrics. Fixes: SCYLLADB-1691 New feature, no backport. Closes scylladb/scylladb#29878 * github.com:scylladb/scylladb: test/cluster: add test for per-service-level transport request latency histogram transport: add per-service-level transport request latency histogram
This commit is contained in:
@@ -644,3 +644,86 @@ async def test_per_service_level_cql_requests_serving(manager: ManagerClient) ->
|
||||
await manager.api.disable_injection(server.ip_addr, "transport_cql_request_pause")
|
||||
safe_driver_shutdown(cluster_a)
|
||||
safe_driver_shutdown(cluster_default)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_per_service_level_cql_request_latency_histogram(manager: ManagerClient) -> None:
|
||||
"""Test that the per-service-level cql_request_latency_histogram metric
|
||||
records transport-level request latencies for each service level.
|
||||
|
||||
Pauses a query on a custom service level and verifies that the histogram
|
||||
is updated only after the response is sent.
|
||||
"""
|
||||
server = await manager.server_add(config=auth_config)
|
||||
cql, _ = await manager.get_ready_cql([server])
|
||||
|
||||
# Create a service level and a user attached to it.
|
||||
sl_a = unique_name()
|
||||
await cql.run_async(f"CREATE SERVICE LEVEL {sl_a}")
|
||||
await cql.run_async(f"CREATE ROLE user_a WITH PASSWORD = 'pass_a' AND LOGIN = true")
|
||||
await cql.run_async(f"ATTACH SERVICE LEVEL {sl_a} TO user_a")
|
||||
|
||||
cluster_a = manager.con_gen([server.ip_addr],
|
||||
manager.port, manager.use_ssl, PlainTextAuthProvider(username='user_a', password='pass_a'))
|
||||
cluster_a.schema_metadata_enabled = False
|
||||
cluster_a.token_metadata_enabled = False
|
||||
session_a = cluster_a.connect()
|
||||
query_task = None
|
||||
|
||||
def metric_value(metrics, suffix):
|
||||
value = metrics.get(f"scylla_transport_cql_request_latency_histogram_{suffix}",
|
||||
{'scheduling_group_name': f'sl:{sl_a}'})
|
||||
return 0 if value is None else value
|
||||
|
||||
try:
|
||||
# Warm up the session before enabling the injection, so the paused query
|
||||
# is not mixed with connection initialization.
|
||||
session_a.execute("SELECT key FROM system.local")
|
||||
|
||||
metrics = await manager.metrics.query(server.ip_addr)
|
||||
baseline_count = metric_value(metrics, "count")
|
||||
baseline_sum = metric_value(metrics, "sum")
|
||||
|
||||
await manager.api.enable_injection(server.ip_addr, "transport_cql_request_pause", False)
|
||||
log = await manager.server_open_log(server.server_id)
|
||||
mark = await log.mark()
|
||||
query_task = asyncio.ensure_future(asyncio.to_thread(session_a.execute, "SELECT key FROM system.local"))
|
||||
await log.wait_for("transport_cql_request_pause: waiting for message", from_mark=mark)
|
||||
|
||||
metrics = await manager.metrics.query(server.ip_addr)
|
||||
paused_count = metric_value(metrics, "count")
|
||||
|
||||
await manager.api.message_injection(server.ip_addr, "transport_cql_request_pause")
|
||||
await query_task
|
||||
|
||||
assert paused_count == baseline_count, \
|
||||
f"Expected no new latency sample while request is paused, got {paused_count - baseline_count}"
|
||||
|
||||
async def latency_sample_recorded():
|
||||
metrics = await manager.metrics.query(server.ip_addr)
|
||||
latency_count = metric_value(metrics, "count")
|
||||
latency_sum = metric_value(metrics, "sum")
|
||||
assert latency_count >= baseline_count + 1, \
|
||||
f"Expected at least one new latency sample for sl:{sl_a}, got {latency_count - baseline_count}"
|
||||
assert latency_sum >= baseline_sum, \
|
||||
f"Expected latency sum to increase for sl:{sl_a}, before={baseline_sum}, after={latency_sum}"
|
||||
return True
|
||||
|
||||
await wait_for(latency_sample_recorded, deadline=time.time() + 60)
|
||||
finally:
|
||||
# If the test failed while the request was still paused at the injection
|
||||
# point, unblock it so it doesn't hang forever, and clean up the injection
|
||||
# to avoid affecting subsequent tests.
|
||||
if query_task is not None:
|
||||
if not query_task.done():
|
||||
try:
|
||||
await manager.api.message_injection(server.ip_addr, "transport_cql_request_pause")
|
||||
except Exception:
|
||||
logger.warning("Failed to unblock paused CQL request", exc_info=True)
|
||||
try:
|
||||
await query_task
|
||||
except Exception:
|
||||
logger.warning("Paused CQL request failed while cleaning up", exc_info=True)
|
||||
await manager.api.disable_injection(server.ip_addr, "transport_cql_request_pause")
|
||||
safe_driver_shutdown(cluster_a)
|
||||
|
||||
@@ -285,6 +285,13 @@ void cql_sg_stats::register_metrics()
|
||||
{{"scheduling_group_name", cur_sg_name}})
|
||||
);
|
||||
|
||||
transport_metrics.emplace_back(
|
||||
sm::make_histogram("cql_request_latency_histogram", [this] { return to_metrics_histogram(_request_latency); },
|
||||
sm::description("A histogram of transport-level CQL request latencies (in microseconds), "
|
||||
"measuring from the start of request processing until the response is written to the socket."),
|
||||
{{"scheduling_group_name", cur_sg_name}}).aggregate({seastar::metrics::shard_label}).set_skip_when_empty()
|
||||
);
|
||||
|
||||
new_metrics.add_group("transport", std::move(transport_metrics));
|
||||
_metrics = std::exchange(new_metrics, {});
|
||||
}
|
||||
@@ -1154,6 +1161,7 @@ future<> cql_server::connection::process_request() {
|
||||
}
|
||||
|
||||
auto& f = *maybe_frame;
|
||||
auto request_start_time = db::timeout_clock::now();
|
||||
|
||||
const bool allow_shedding = _client_state.get_workload_type() == service::client_state::workload_type::interactive;
|
||||
if (allow_shedding && _shed_incoming_requests) {
|
||||
@@ -1231,14 +1239,14 @@ future<> cql_server::connection::process_request() {
|
||||
++_server._stats.requests_blocked_memory;
|
||||
}
|
||||
|
||||
return fut.then_wrapped([this, length = f.length, flags = f.flags, op, stream, tracing_requested] (auto mem_permit_fut) {
|
||||
return fut.then_wrapped([this, length = f.length, flags = f.flags, op, stream, tracing_requested, request_start_time] (auto mem_permit_fut) {
|
||||
if (mem_permit_fut.failed()) {
|
||||
// Ignore semaphore errors - they are expected if load shedding took place
|
||||
mem_permit_fut.ignore_ready_future();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
semaphore_units<> mem_permit = mem_permit_fut.get();
|
||||
return this->read_and_decompress_frame(length, flags).then([this, op, stream, tracing_requested, mem_permit = make_service_permit(std::move(mem_permit))] (fragmented_temporary_buffer buf) mutable {
|
||||
return this->read_and_decompress_frame(length, flags).then([this, op, stream, tracing_requested, mem_permit = make_service_permit(std::move(mem_permit)), request_start_time] (fragmented_temporary_buffer buf) mutable {
|
||||
|
||||
++_server._stats.requests_served;
|
||||
++_server._stats.requests_serving;
|
||||
@@ -1267,7 +1275,7 @@ future<> cql_server::connection::process_request() {
|
||||
_process_request_stage(this, istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit) :
|
||||
process_request_one(istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit);
|
||||
|
||||
future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> response_f) mutable {
|
||||
future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream, request_start_time] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> response_f) mutable {
|
||||
try {
|
||||
auto& sg_stats = _server.get_cql_sg_stats();
|
||||
size_t pending_response_size = 0;
|
||||
@@ -1291,8 +1299,9 @@ future<> cql_server::connection::process_request() {
|
||||
sg_stats._pending_response_memory += pending_response_size;
|
||||
write_response(std::move(response), _compression);
|
||||
}
|
||||
_ready_to_respond = _ready_to_respond.finally([leave = std::move(leave), permit = std::move(mem_permit), &sg_stats, pending_response_size] {
|
||||
_ready_to_respond = _ready_to_respond.finally([leave = std::move(leave), permit = std::move(mem_permit), &sg_stats, pending_response_size, request_start_time] {
|
||||
sg_stats._pending_response_memory -= pending_response_size;
|
||||
sg_stats._request_latency.add(db::timeout_clock::now() - request_start_time);
|
||||
});
|
||||
} catch (...) {
|
||||
clogger.error("{}: request processing failed: {}",
|
||||
|
||||
@@ -153,6 +153,12 @@ struct cql_sg_stats {
|
||||
// scheduling group. Incremented when a request starts processing,
|
||||
// decremented after the response is sent.
|
||||
uint32_t _requests_serving = 0;
|
||||
|
||||
// Latency histogram tracking the transport-level request lifetime:
|
||||
// from the start of request processing until the response is written
|
||||
// to the socket. This captures processing time and time waiting in the
|
||||
// response write queue, complementing storage-proxy-level latency.
|
||||
utils::time_estimated_histogram _request_latency;
|
||||
private:
|
||||
bool _use_metrics = false;
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
|
||||
Reference in New Issue
Block a user