diff --git a/transport/server.cc b/transport/server.cc index 49e53c8c9e..9c909cfb29 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -285,6 +285,13 @@ void cql_sg_stats::register_metrics() {{"scheduling_group_name", cur_sg_name}}) ); + transport_metrics.emplace_back( + sm::make_histogram("cql_request_latency_histogram", [this] { return to_metrics_histogram(_request_latency); }, + sm::description("A histogram of transport-level CQL request latencies (in microseconds), " + "measuring from the start of request processing until the response is written to the socket."), + {{"scheduling_group_name", cur_sg_name}}).aggregate({seastar::metrics::shard_label}).set_skip_when_empty() + ); + new_metrics.add_group("transport", std::move(transport_metrics)); _metrics = std::exchange(new_metrics, {}); } @@ -1154,6 +1161,7 @@ future<> cql_server::connection::process_request() { } auto& f = *maybe_frame; + auto request_start_time = db::timeout_clock::now(); const bool allow_shedding = _client_state.get_workload_type() == service::client_state::workload_type::interactive; if (allow_shedding && _shed_incoming_requests) { @@ -1231,14 +1239,14 @@ future<> cql_server::connection::process_request() { ++_server._stats.requests_blocked_memory; } - return fut.then_wrapped([this, length = f.length, flags = f.flags, op, stream, tracing_requested] (auto mem_permit_fut) { + return fut.then_wrapped([this, length = f.length, flags = f.flags, op, stream, tracing_requested, request_start_time] (auto mem_permit_fut) { if (mem_permit_fut.failed()) { // Ignore semaphore errors - they are expected if load shedding took place mem_permit_fut.ignore_ready_future(); return make_ready_future<>(); } semaphore_units<> mem_permit = mem_permit_fut.get(); - return this->read_and_decompress_frame(length, flags).then([this, op, stream, tracing_requested, mem_permit = make_service_permit(std::move(mem_permit))] (fragmented_temporary_buffer buf) mutable { + return this->read_and_decompress_frame(length, flags).then([this, op, stream, tracing_requested, mem_permit = make_service_permit(std::move(mem_permit)), request_start_time] (fragmented_temporary_buffer buf) mutable { ++_server._stats.requests_served; ++_server._stats.requests_serving; @@ -1267,7 +1275,7 @@ future<> cql_server::connection::process_request() { _process_request_stage(this, istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit) : process_request_one(istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit); - future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream] (future>> response_f) mutable { + future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream, request_start_time] (future>> response_f) mutable { try { auto& sg_stats = _server.get_cql_sg_stats(); size_t pending_response_size = 0; @@ -1291,8 +1299,9 @@ future<> cql_server::connection::process_request() { sg_stats._pending_response_memory += pending_response_size; write_response(std::move(response), _compression); } - _ready_to_respond = _ready_to_respond.finally([leave = std::move(leave), permit = std::move(mem_permit), &sg_stats, pending_response_size] { + _ready_to_respond = _ready_to_respond.finally([leave = std::move(leave), permit = std::move(mem_permit), &sg_stats, pending_response_size, request_start_time] { sg_stats._pending_response_memory -= pending_response_size; + sg_stats._request_latency.add(db::timeout_clock::now() - request_start_time); }); } catch (...) { clogger.error("{}: request processing failed: {}", diff --git a/transport/server.hh b/transport/server.hh index f522909e3e..7768afca33 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -153,6 +153,12 @@ struct cql_sg_stats { // scheduling group. Incremented when a request starts processing, // decremented after the response is sent. uint32_t _requests_serving = 0; + + // Latency histogram tracking the transport-level request lifetime: + // from the start of request processing until the response is written + // to the socket. This captures processing time and time waiting in the + // response write queue, complementing storage-proxy-level latency. + utils::time_estimated_histogram _request_latency; private: bool _use_metrics = false; seastar::metrics::metric_groups _metrics;