Compare commits

...

1 Commits

Author SHA1 Message Date
Yaniv Michael Kaul
4d44ee3bb4 transport: move latency histogram marking to CQL transport flush path
Move the latency counter start and histogram marking from
storage_proxy to the CQL transport layer so that histograms
include the time spent waiting in the response queue and
flushing to the OS socket.

The latency counter is started when the CQL request arrives
at process_query/execute/batch. Each statement sets the
appropriate histogram (read/write/range/cas_read/cas_write)
on query_state. The transport layer extracts the deferred
mark and records it in _ready_to_respond.finally(), after
the response has been written to the socket.

Internal writes (hints, view updates) that bypass the CQL
transport still mark latency directly in send_to_endpoint.
Forwarded requests mark on the target node before returning.
Shard-bounced requests mark on the target shard and reset
the mark before crossing shard boundaries.

Fixes: scylladb/scylladb#23189
2026-05-10 08:18:31 +03:00
10 changed files with 278 additions and 92 deletions

View File

@@ -275,9 +275,12 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::do_
_statements[i].statement->restrictions().validate_primary_key(options.for_statement(i));
}
// Set the histogram for deferred latency marking.
auto& stats = qp.proxy().get_stats();
if (_has_conditions) {
++_stats.cas_batches;
_stats.statements_in_cas_batches += _statements.size();
query_state.set_latency_histogram(stats.cas_write);
return execute_with_conditions(qp, options, query_state).then([guardrail_state, cl] (auto result) {
if (guardrail_state == query_processor::write_consistency_guardrail_state::WARN) {
result->add_warning(format("Using write consistency level {} listed on the "
@@ -290,10 +293,13 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::do_
++_stats.batches;
_stats.statements_in_batches += _statements.size();
query_state.set_latency_histogram(stats.write);
auto timeout = db::timeout_clock::now() + get_timeout(query_state.get_client_state(), options);
auto defer_latency = query_state.has_deferred_latency();
return get_mutations(qp, options, timeout, local, now, query_state).then([this, &qp, cl, timeout, tr_state = query_state.get_trace_state(),
permit = query_state.get_permit()] (utils::chunked_vector<mutation> ms) mutable {
return execute_without_conditions(qp, std::move(ms), cl, timeout, std::move(tr_state), std::move(permit));
permit = query_state.get_permit(), defer_latency] (utils::chunked_vector<mutation> ms) mutable {
return execute_without_conditions(qp, std::move(ms), cl, timeout, std::move(tr_state), std::move(permit), defer_latency);
}).then([guardrail_state, cl] (coordinator_result<> res) {
if (!res) {
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(
@@ -314,7 +320,8 @@ future<coordinator_result<>> batch_statement::execute_without_conditions(
db::consistency_level cl,
db::timeout_clock::time_point timeout,
tracing::trace_state_ptr tr_state,
service_permit permit) const
service_permit permit,
bool defer_coordinator_latency_mark) const
{
// FIXME: do we need to do this?
#if 0
@@ -341,7 +348,9 @@ future<coordinator_result<>> batch_statement::execute_without_conditions(
mutate_atomic = false;
}
}
return qp.proxy().mutate_with_triggers(std::move(mutations), cl, timeout, mutate_atomic, std::move(tr_state), std::move(permit), db::allow_per_partition_rate_limit::yes);
return qp.proxy().mutate_with_triggers(std::move(mutations), cl, timeout, mutate_atomic, std::move(tr_state), std::move(permit), db::allow_per_partition_rate_limit::yes, false, {
.defer_coordinator_latency_mark = defer_coordinator_latency_mark,
});
}
future<shared_ptr<cql_transport::messages::result_message>> batch_statement::execute_with_conditions(
@@ -402,7 +411,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
auto* request_ptr = request.get();
return qp.proxy().cas(schema, std::move(cas_shard), *request_ptr, request->read_command(qp), request->key(),
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()},
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state(), {}, {}, service::node_local_only::no, qs.has_deferred_latency()},
std::move(cl_for_paxos).assume_value(), cl_for_learn, batch_timeout, cas_timeout).then([this, request = std::move(request)] (bool is_applied) {
return request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied);
});
@@ -490,4 +499,3 @@ audit::statement_category batch_statement::category() const {
}

View File

@@ -140,7 +140,8 @@ private:
db::consistency_level cl,
db::timeout_clock::time_point timeout,
tracing::trace_state_ptr tr_state,
service_permit permit) const;
service_permit permit,
bool defer_coordinator_latency_mark) const;
future<shared_ptr<cql_transport::messages::result_message>> execute_with_conditions(
query_processor& qp,

View File

@@ -281,7 +281,10 @@ modification_statement::do_execute(query_processor& qp, service::query_state& qs
_restrictions->validate_primary_key(options);
// Set the histogram for deferred latency marking.
auto& stats = qp.proxy().get_stats();
if (has_conditions()) {
qs.set_latency_histogram(stats.cas_write);
auto result = co_await execute_with_condition(qp, qs, options);
if (guardrail_state == query_processor::write_consistency_guardrail_state::WARN) {
result->add_warning(format("Using write consistency level {} listed on the "
@@ -290,6 +293,8 @@ modification_statement::do_execute(query_processor& qp, service::query_state& qs
co_return result;
}
qs.set_latency_histogram(stats.write);
json_cache_opt json_cache = maybe_prepare_json_cache(options);
std::vector<dht::partition_range> keys = build_partition_keys(options, json_cache);
@@ -334,7 +339,8 @@ modification_statement::execute_without_condition(query_processor& qp, service::
}
return qp.proxy().mutate_with_triggers(std::move(mutations), cl, timeout, false, qs.get_trace_state(), qs.get_permit(), db::allow_per_partition_rate_limit::yes, this->is_raw_counter_shard_write(), {
.node_local_only = options.get_specific_options().node_local_only
.node_local_only = options.get_specific_options().node_local_only,
.defer_coordinator_latency_mark = qs.has_deferred_latency(),
});
});
}
@@ -450,7 +456,7 @@ modification_statement::execute_with_condition(query_processor& qp, service::que
}
return qp.proxy().cas(s, std::move(cas_shard), *request_ptr, request->read_command(qp), request->key(),
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()},
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state(), {}, {}, service::node_local_only::no, qs.has_deferred_latency()},
std::move(cl_for_paxos).assume_value(), cl_for_learn, statement_timeout, cas_timeout).then([this, request = std::move(request), tablet_replicas = std::move(tablet_info->tablet_replicas), token_range = tablet_info->token_range] (bool is_applied) {
auto result = request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied);
result->add_tablet_info(tablet_replicas, token_range);

View File

@@ -482,6 +482,19 @@ select_statement::do_execute(query_processor& qp,
auto key_ranges = _restrictions->get_partition_key_ranges(options);
// Set the histogram for deferred latency marking.
// Serial consistency reads go through paxos (cas_read histogram),
// non-serial reads use read or range histograms.
{
auto& stats = qp.proxy().get_stats();
if (db::is_serial_consistency(options.get_consistency())) {
state.set_latency_histogram(stats.cas_read);
} else {
bool is_range = key_ranges.empty() || !query::is_single_partition(key_ranges.front());
state.set_latency_histogram(is_range ? stats.range : stats.read);
}
}
auto token = dht::token();
std::optional<locator::tablet_routing_info> tablet_info = {};
@@ -765,7 +778,7 @@ view_indexed_table_select_statement::do_execute_base_query(
if (previous_result_size < query::result_memory_limiter::maximum_result_size && concurrency < max_base_table_query_concurrency) {
concurrency *= 2;
}
coordinator_result<service::storage_proxy::coordinator_query_result> rqr = co_await qp.proxy().query_result(_schema, command, std::move(prange), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()});
coordinator_result<service::storage_proxy::coordinator_query_result> rqr = co_await qp.proxy().query_result(_schema, command, std::move(prange), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, service::node_local_only::no, state.has_deferred_latency()});
if (!rqr.has_value()) {
co_return std::move(rqr).as_failure();
}
@@ -837,7 +850,7 @@ view_indexed_table_select_statement::do_execute_base_query(
command->slice._row_ranges.push_back(query::clustering_range::make_singular(key.clustering));
}
coordinator_result<service::storage_proxy::coordinator_query_result> rqr
= co_await qp.proxy().query_result(_schema, command, {dht::partition_range::make_singular(key.partition)}, options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()});
= co_await qp.proxy().query_result(_schema, command, {dht::partition_range::make_singular(key.partition)}, options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, service::node_local_only::no, state.has_deferred_latency()});
if (!rqr.has_value()) {
co_return std::move(rqr).as_failure();
}
@@ -912,7 +925,7 @@ select_statement::execute_without_checking_exception_message_non_aggregate_unpag
command,
std::move(prange),
options.get_consistency(),
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, options.get_specific_options().node_local_only},
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, options.get_specific_options().node_local_only, state.has_deferred_latency()},
cas_shard).then(utils::result_wrap([] (service::storage_proxy::coordinator_query_result qr) {
return make_ready_future<coordinator_result<foreign_ptr<lw_shared_ptr<query::result>>>>(std::move(qr.query_result));
}));
@@ -921,7 +934,7 @@ select_statement::execute_without_checking_exception_message_non_aggregate_unpag
return this->process_results(std::move(result), cmd, options, now);
}));
} else {
return qp.proxy().query_result(_query_schema, cmd, std::move(partition_ranges), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, options.get_specific_options().node_local_only}, std::move(cas_shard))
return qp.proxy().query_result(_query_schema, cmd, std::move(partition_ranges), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, options.get_specific_options().node_local_only, state.has_deferred_latency()}, std::move(cas_shard))
.then(wrap_result_to_error_message([this, &options, now, cmd] (service::storage_proxy::coordinator_query_result qr) {
return this->process_results(std::move(qr.query_result), cmd, options, now);
}));
@@ -1199,6 +1212,10 @@ view_indexed_table_select_statement::actually_do_execute(query_processor& qp,
validate_for_read(cl);
// Secondary index reads always go through proxy().query_result().
// Mark as read since these are always single-partition lookups by primary key.
state.set_latency_histogram(qp.proxy().get_stats().read);
auto now = gc_clock::now();
++_stats.secondary_index_reads;
@@ -1431,7 +1448,7 @@ view_indexed_table_select_statement::read_posting_list(query_processor& qp,
int32_t page_size = options.get_page_size();
if (page_size <= 0 || !service::pager::query_pagers::may_need_paging(*_view_schema, page_size, *cmd, partition_ranges)) {
return qp.proxy().query_result(_view_schema, cmd, std::move(partition_ranges), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()})
return qp.proxy().query_result(_view_schema, cmd, std::move(partition_ranges), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, service::node_local_only::no, state.has_deferred_latency()})
.then(utils::result_wrap([this, now, &options, selection = std::move(selection), partition_slice = std::move(partition_slice)] (service::storage_proxy::coordinator_query_result qr)
-> coordinator_result<::shared_ptr<cql_transport::messages::result_message::rows>> {
cql3::selection::result_set_builder builder(*selection, now, &options);
@@ -1883,7 +1900,7 @@ mutation_fragments_select_statement::do_execute(query_processor& qp, service::qu
|| !service::pager::query_pagers::may_need_paging(*_schema, page_size,
*command, key_ranges))) {
return do_query(erm_keepalive, {}, qp.proxy(), _schema, command, std::move(key_ranges), cl,
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}})
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, service::node_local_only::no, state.has_deferred_latency()})
.then(wrap_result_to_error_message([this, erm_keepalive, now, &options, slice = command->slice] (service::storage_proxy_coordinator_query_result&& qr) mutable {
cql3::selection::result_set_builder builder(*_selection, now, &options);
query::result_view::consume(*qr.query_result, std::move(slice),
@@ -2116,6 +2133,9 @@ future<shared_ptr<cql_transport::messages::result_message>> vector_indexed_table
tracing::add_table_name(state.get_trace_state(), keyspace(), column_family());
validate_for_read(options.get_consistency());
// Vector index reads go through proxy().query_result() with single-partition lookups.
state.set_latency_histogram(qp.proxy().get_stats().read);
_query_start_time_point = gc_clock::now();
update_stats();
@@ -2208,7 +2228,7 @@ future<::shared_ptr<cql_transport::messages::result_message>> vector_indexed_tab
cmd->slice._row_ranges = query::clustering_row_ranges{query::clustering_range::make_singular(key.clustering)};
coordinator_result<service::storage_proxy::coordinator_query_result> rqr =
co_await qp.proxy().query_result(_schema, cmd, {dht::partition_range::make_singular(key.partition)}, options.get_consistency(),
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()});
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, service::node_local_only::no, state.has_deferred_latency()});
if (!rqr) {
co_return std::move(rqr).as_failure();
}
@@ -2228,7 +2248,7 @@ future<::shared_ptr<cql_transport::messages::result_message>> vector_indexed_tab
co_return co_await qp.proxy()
.query_result(_query_schema, command, std::move(partition_ranges), options.get_consistency(),
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, options.get_specific_options().node_local_only},
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, options.get_specific_options().node_local_only, state.has_deferred_latency()},
std::nullopt)
.then(wrap_result_to_error_message([this, &options, command](service::storage_proxy::coordinator_query_result qr) {
command->set_row_limit(get_limit(options, _limit));

View File

@@ -197,7 +197,7 @@ future<result<service::storage_proxy::coordinator_query_result>> query_pager::do
std::move(command),
std::move(ranges),
_options.get_consistency(),
{timeout, _state.get_permit(), _state.get_client_state(), _state.get_trace_state(), std::move(_last_replicas), _query_read_repair_decision, _options.get_specific_options().node_local_only},
{timeout, _state.get_permit(), _state.get_client_state(), _state.get_trace_state(), std::move(_last_replicas), _query_read_repair_decision, _options.get_specific_options().node_local_only, _state.has_deferred_latency()},
std::move(cas_shard));
}

View File

@@ -14,17 +14,35 @@
#include "tracing/tracing.hh"
#include "tracing/trace_state.hh"
#include "service_permit.hh"
#include "utils/latency.hh"
namespace utils {
class timed_rate_moving_average_summary_and_histogram;
}
namespace qos {
class service_level_controller;
}
namespace service {
// Carries a started latency counter and a pointer to the histogram
// that should be marked when the response is flushed to the client.
// The counter is started at the transport layer when the request
// arrives and the histogram pointer is set by the statement layer
// once the operation type (read/write/range/cas) is known.
// The transport layer stops the counter and marks the histogram
// after the response has been flushed to the OS socket.
struct deferred_latency_mark {
utils::latency_counter lc;
utils::timed_rate_moving_average_summary_and_histogram* histogram = nullptr;
};
class query_state final {
private:
client_state& _client_state;
tracing::trace_state_ptr _trace_state_ptr;
service_permit _permit;
std::optional<deferred_latency_mark> _deferred_latency;
public:
query_state(client_state& client_state, service_permit permit)
@@ -70,6 +88,33 @@ public:
return _client_state.get_service_level_controller();
}
// Start the latency counter. Called from the transport layer
// when the request first arrives.
void start_latency() {
_deferred_latency.emplace();
_deferred_latency->lc.start();
}
// Set the histogram that should be marked when the response
// is flushed. Called from the statement layer once the
// operation type is known.
void set_latency_histogram(utils::timed_rate_moving_average_summary_and_histogram& hist) {
if (_deferred_latency) {
_deferred_latency->histogram = &hist;
}
}
bool has_deferred_latency() const {
return _deferred_latency.has_value();
}
// Extract the deferred latency mark. The transport layer
// calls this to take ownership and mark the histogram after
// the response is flushed.
std::optional<deferred_latency_mark> take_deferred_latency() {
return std::exchange(_deferred_latency, std::nullopt);
}
};
}

View File

@@ -3979,9 +3979,8 @@ future<result<>> storage_proxy::mutate_begin(unique_response_handler_vector ids,
// this function should be called with a future that holds result of mutation attempt (usually
// future returned by mutate_begin()). The future should be ready when function is called.
future<result<>> storage_proxy::mutate_end(future<result<>> mutate_result, utils::latency_counter lc, write_stats& stats, tracing::trace_state_ptr trace_state) {
future<result<>> storage_proxy::mutate_end(future<result<>> mutate_result, write_stats& stats, tracing::trace_state_ptr trace_state) {
SCYLLA_ASSERT(mutate_result.available());
stats.write.mark(lc.stop().latency());
return utils::result_futurize_try([&] {
auto&& res = mutate_result.get();
@@ -4208,14 +4207,27 @@ future<> storage_proxy::mutate(utils::chunked_vector<mutation> mutations, db::co
}
future<result<>> storage_proxy::mutate_result(utils::chunked_vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, bool raw_counters, coordinator_mutate_options options) {
std::optional<utils::latency_counter> lc;
if (!options.defer_coordinator_latency_mark) {
lc.emplace();
lc->start();
}
if (_cdc && _cdc->needs_cdc_augmentation(mutations)) {
return _cdc->augment_mutation_call(timeout, std::move(mutations), tr_state, cl, std::move(options.cdc_options)).then([this, cl, timeout, tr_state, permit = std::move(permit), raw_counters, cdc = _cdc->shared_from_this(), allow_limit, options = std::move(options)](std::tuple<utils::chunked_vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>&& t) mutable {
return _cdc->augment_mutation_call(timeout, std::move(mutations), tr_state, cl, std::move(options.cdc_options)).then([this, cl, timeout, tr_state, permit = std::move(permit), raw_counters, cdc = _cdc->shared_from_this(), allow_limit, options = std::move(options), lc = std::move(lc)](std::tuple<utils::chunked_vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>&& t) mutable {
auto mutations = std::move(std::get<0>(t));
auto tracker = std::move(std::get<1>(t));
return _mutate_stage(this, std::move(mutations), cl, timeout, std::move(tr_state), std::move(permit), raw_counters, allow_limit, std::move(tracker), std::move(options));
return _mutate_stage(this, std::move(mutations), cl, timeout, std::move(tr_state), std::move(permit), raw_counters, allow_limit, std::move(tracker), std::move(options)).finally([this, lc = std::move(lc)] () mutable {
if (lc) {
get_stats().write.mark(lc->stop().latency());
}
});
});
}
return _mutate_stage(this, std::move(mutations), cl, timeout, std::move(tr_state), std::move(permit), raw_counters, allow_limit, nullptr, std::move(options));
return _mutate_stage(this, std::move(mutations), cl, timeout, std::move(tr_state), std::move(permit), raw_counters, allow_limit, nullptr, std::move(options)).finally([this, lc = std::move(lc)] () mutable {
if (lc) {
get_stats().write.mark(lc->stop().latency());
}
});
}
future<result<>> storage_proxy::do_mutate(utils::chunked_vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, bool raw_counters, db::allow_per_partition_rate_limit allow_limit, lw_shared_ptr<cdc::operation_result_tracker> cdc_tracker, coordinator_mutate_options options) {
@@ -4262,15 +4274,12 @@ storage_proxy::mutate_internal(Range mutations, db::consistency_level cl, tracin
// special handling, e.g. counters. otherwise, a default type is used.
auto type = type_opt.value_or(std::next(std::begin(mutations)) == std::end(mutations) ? db::write_type::SIMPLE : db::write_type::UNLOGGED_BATCH);
utils::latency_counter lc;
lc.start();
return mutate_prepare(mutations, cl, type, tr_state, std::move(permit), allow_limit, std::move(options)).then(utils::result_wrap([this, cl, timeout_opt, tracker = std::move(cdc_tracker),
tr_state] (storage_proxy::unique_response_handler_vector ids) mutable {
register_cdc_operation_result_tracker(ids, tracker);
return mutate_begin(std::move(ids), cl, tr_state, timeout_opt);
})).then_wrapped([this, p = shared_from_this(), lc, tr_state] (future<result<>> f) mutable {
return p->mutate_end(std::move(f), lc, get_stats(), std::move(tr_state));
})).then_wrapped([this, p = shared_from_this(), tr_state] (future<result<>> f) mutable {
return p->mutate_end(std::move(f), get_stats(), std::move(tr_state));
});
}
@@ -4379,8 +4388,6 @@ static host_id_vector_replica_set endpoint_filter(
future<result<>>
storage_proxy::mutate_atomically_result(utils::chunked_vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, coordinator_mutate_options options) {
utils::latency_counter lc;
lc.start();
class context {
storage_proxy& _p;
@@ -4494,8 +4501,8 @@ storage_proxy::mutate_atomically_result(utils::chunked_vector<mutation> mutation
return make_exception_future<lw_shared_ptr<context>>(std::current_exception());
}
};
auto cleanup = [p = shared_from_this(), lc, tr_state] (future<result<>> f) mutable {
return p->mutate_end(std::move(f), lc, p->get_stats(), std::move(tr_state));
auto cleanup = [p = shared_from_this(), tr_state] (future<result<>> f) mutable {
return p->mutate_end(std::move(f), p->get_stats(), std::move(tr_state));
};
if (_cdc && _cdc->needs_cdc_augmentation(mutations)) {
@@ -4532,9 +4539,6 @@ future<> storage_proxy::send_to_endpoint(
write_stats& stats,
allow_hints allow_hints,
is_cancellable cancellable) {
utils::latency_counter lc;
lc.start();
std::optional<clock_type::time_point> timeout;
db::consistency_level cl = allow_hints ? db::consistency_level::ANY : db::consistency_level::ONE;
if (type == db::write_type::VIEW) {
@@ -4543,6 +4547,9 @@ future<> storage_proxy::send_to_endpoint(
timeout = clock_type::now() + 5min;
}
utils::latency_counter lc;
lc.start();
return mutate_prepare(std::array{std::move(m)},
[this, tr_state, erm = std::move(ermp), target = std::array{target}, pending_endpoints, &stats, cancellable, cl, type, /* does view building should hold a real permit */ permit = empty_service_permit()] (std::unique_ptr<mutation_holder>& m) mutable {
host_id_vector_replica_set targets;
@@ -4570,8 +4577,11 @@ future<> storage_proxy::send_to_endpoint(
cancellable);
}).then(utils::result_wrap([this, cl, tr_state = std::move(tr_state), timeout = std::move(timeout)] (unique_response_handler_vector ids) mutable {
return mutate_begin(std::move(ids), cl, std::move(tr_state), std::move(timeout));
})).then_wrapped([p = shared_from_this(), lc, &stats] (future<result<>> f) {
return p->mutate_end(std::move(f), lc, stats, nullptr).then(utils::result_into_future<result<>>);
})).then_wrapped([p = shared_from_this(), lc, &stats] (future<result<>> f) mutable {
// Internal writes (hints, view updates) don't go through the CQL
// transport layer, so mark latency here directly.
stats.write.mark(lc.stop().latency());
return p->mutate_end(std::move(f), stats, nullptr).then(utils::result_into_future<result<>>);
});
}
@@ -6739,28 +6749,41 @@ storage_proxy::do_query(schema_ptr s,
auto f = do_query_with_paxos(std::move(s), std::move(cmd), std::move(partition_ranges), cl, std::move(query_options), std::move(*cas_shard));
return utils::then_ok_result<result<storage_proxy::coordinator_query_result>>(std::move(f));
} else {
utils::latency_counter lc;
lc.start();
std::optional<utils::latency_counter> lc;
if (!query_options.defer_coordinator_latency_mark) {
lc.emplace();
lc->start();
}
auto p = shared_from_this();
if (query::is_single_partition(partition_ranges[0])) { // do not support mixed partitions (yet?)
try {
return query_singular(cmd,
auto f = query_singular(cmd,
std::move(partition_ranges),
cl,
std::move(query_options)).finally([lc, p] () mutable {
std::move(query_options));
if (!lc) {
return f;
}
return std::move(f).finally([lc = std::move(*lc), p] () mutable {
p->get_stats().read.mark(lc.stop().latency());
});
} catch (const replica::no_such_column_family&) {
get_stats().read.mark(lc.stop().latency());
if (lc) {
get_stats().read.mark(lc->stop().latency());
}
return make_empty();
}
}
return query_partition_key_range(cmd,
auto f = query_partition_key_range(cmd,
std::move(partition_ranges),
cl,
std::move(query_options)).finally([lc, p] () mutable {
std::move(query_options));
if (!lc) {
return f;
}
return std::move(f).finally([lc = std::move(*lc), p] () mutable {
p->get_stats().range.mark(lc.stop().latency());
});
}
@@ -6929,15 +6952,21 @@ future<bool> storage_proxy::cas(schema_ptr schema, cas_shard cas_shard, cas_requ
unsigned contentions = 0;
utils::latency_counter lc;
lc.start();
std::optional<utils::latency_counter> lc;
if (!query_options.defer_coordinator_latency_mark) {
lc.emplace();
lc->start();
}
bool condition_met;
try {
auto update_stats = seastar::defer ([&] {
get_stats().cas_foreground--;
write ? get_stats().cas_write.mark(lc.stop().latency()) : get_stats().cas_read.mark(lc.stop().latency());
if (lc) {
auto latency = lc->stop().latency();
(write ? get_stats().cas_write : get_stats().cas_read).mark(latency);
}
if (contentions > 0) {
write ? get_stats().cas_write_contention.add(contentions) : get_stats().cas_read_contention.add(contentions);
}
@@ -6964,7 +6993,11 @@ future<bool> storage_proxy::cas(schema_ptr schema, cas_shard cas_shard, cas_requ
++get_stats().cas_failed_read_round_optimization;
auto pr = partition_ranges; // cannot move original because it can be reused during retry
auto cqr = co_await query(schema, cmd, std::move(pr), cl, query_options);
// Always defer read latency marking for the internal read inside CAS;
// CAS has its own latency histogram (cas_read/cas_write) marked in update_stats above.
auto internal_query_options = query_options;
internal_query_options.defer_coordinator_latency_mark = true;
auto cqr = co_await query(schema, cmd, std::move(pr), cl, std::move(internal_query_options));
qr = std::move(cqr.query_result);
}

View File

@@ -130,6 +130,7 @@ public:
replicas_per_token_range preferred_replicas;
std::optional<db::read_repair_decision> read_repair_decision;
node_local_only node_local_only;
bool defer_coordinator_latency_mark = false;
storage_proxy_coordinator_query_options(storage_proxy_clock_type::time_point timeout,
service_permit permit_,
@@ -137,14 +138,16 @@ public:
tracing::trace_state_ptr trace_state = nullptr,
replicas_per_token_range preferred_replicas = { },
std::optional<db::read_repair_decision> read_repair_decision = { },
service::node_local_only node_local_only_ = service::node_local_only::no)
service::node_local_only node_local_only_ = service::node_local_only::no,
bool defer_coordinator_latency_mark_ = false)
: _timeout(timeout)
, permit(std::move(permit_))
, cstate(client_state_)
, trace_state(std::move(trace_state))
, preferred_replicas(std::move(preferred_replicas))
, read_repair_decision(read_repair_decision)
, node_local_only(node_local_only_) {
, node_local_only(node_local_only_)
, defer_coordinator_latency_mark(defer_coordinator_latency_mark_) {
}
storage_proxy_clock_type::time_point timeout(storage_proxy& sp) const {
@@ -169,6 +172,7 @@ struct storage_proxy_coordinator_query_result {
struct storage_proxy_coordinator_mutate_options {
cdc::per_request_options cdc_options;
node_local_only node_local_only = node_local_only::no;
bool defer_coordinator_latency_mark = false;
};
class cas_request;
@@ -467,7 +471,7 @@ private:
template<typename Range>
future<result<unique_response_handler_vector>> mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, coordinator_mutate_options options);
future<result<>> mutate_begin(unique_response_handler_vector ids, db::consistency_level cl, tracing::trace_state_ptr trace_state, std::optional<clock_type::time_point> timeout_opt = { });
future<result<>> mutate_end(future<result<>> mutate_result, utils::latency_counter, write_stats& stats, tracing::trace_state_ptr trace_state);
future<result<>> mutate_end(future<result<>> mutate_result, write_stats& stats, tracing::trace_state_ptr trace_state);
future<result<>> schedule_repair(locator::effective_replication_map_ptr ermp, mutations_per_partition_key_map diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, service_permit permit);
bool need_throttle_writes() const;
void unthrottle();

View File

@@ -80,6 +80,16 @@ using coordinator_result = exceptions::coordinator_result<T>;
namespace cql_transport {
// process_fn_return_type constructors/destructor - defined here because
// response is an incomplete type in the header.
cql_server::process_fn_return_type::process_fn_return_type(result_with_foreign_response_ptr r, std::optional<service::deferred_latency_mark> lm)
: result(std::move(r)), latency_mark(std::move(lm)) {}
cql_server::process_fn_return_type::process_fn_return_type(result_with_bounce r, std::optional<service::deferred_latency_mark> lm)
: result(std::move(r)), latency_mark(std::move(lm)) {}
cql_server::process_fn_return_type::process_fn_return_type(process_fn_return_type&&) noexcept = default;
cql_server::process_fn_return_type& cql_server::process_fn_return_type::operator=(process_fn_return_type&&) noexcept = default;
cql_server::process_fn_return_type::~process_fn_return_type() = default;
static logging::logger clogger("cql_server");
/**
@@ -503,7 +513,9 @@ future<forward_cql_execute_response> cql_server::handle_forward_execute(
std::move(req.cached_fn_calls),
handling_node_bounce::yes));
if (auto* bounce_msg = std::get_if<cql_server::result_with_bounce>(&result)) {
if (auto* bounce_msg = std::get_if<cql_server::result_with_bounce>(&result.result)) {
// The request needs to be redirected — don't mark latency since
// no real work was done on this node.
auto host = (*bounce_msg)->target_host();
auto shard = (*bounce_msg)->target_shard();
co_return forward_cql_execute_response{
@@ -513,7 +525,13 @@ future<forward_cql_execute_response> cql_server::handle_forward_execute(
};
}
auto& final_result = std::get<cql_server::result_with_foreign_response_ptr>(result);
// Mark latency on the target shard since forwarded requests don't
// go through the originating shard's transport flush path.
if (result.latency_mark && result.latency_mark->histogram) {
result.latency_mark->histogram->mark(result.latency_mark->lc.stop().latency());
}
auto& final_result = std::get<cql_server::result_with_foreign_response_ptr>(result.result);
if (!final_result) {
co_return co_await coroutine::try_future(final_result.assume_error().as_exception_future<forward_cql_execute_response>());
@@ -934,7 +952,7 @@ std::unique_ptr<cql_server::response> cql_server::handle_exception(int16_t strea
return make_error(stream, exceptions::exception_code::SERVER_ERROR, "unknown error", trace_state);
}
}
future<foreign_ptr<std::unique_ptr<cql_server::response>>>
future<cql_server::response_with_latency>
cql_server::connection::process_request_one(fragmented_temporary_buffer::istream fbuf, uint8_t op, uint16_t stream, service::client_state& client_state, tracing_request_type tracing_request, service_permit permit) {
using auth_state = service::client_state::auth_state;
@@ -1004,18 +1022,26 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
_version, get_dialect());
default: return make_exception_future<process_fn_return_type>(exceptions::protocol_exception(format("Unknown opcode {:d}", int(cqlop))));
}
}).then_wrapped([this, cqlop, &cql_stats, stream, &client_state, linearization_buffer = std::move(linearization_buffer), trace_state] (future<process_fn_return_type> f) {
}).then_wrapped([this, cqlop, &cql_stats, stream, &client_state, linearization_buffer = std::move(linearization_buffer), trace_state] (future<process_fn_return_type> f) -> future<response_with_latency> {
auto stop_trace = defer([&] {
tracing::stop_foreground(trace_state);
});
return seastar::futurize_invoke([&] () {
// Extract latency mark so it survives error paths.
// On f.failed(), the mark was destroyed with the coroutine frame,
// so latency_mark stays nullopt — acceptable since the statement
// layer may not have run far enough to set a histogram.
std::optional<service::deferred_latency_mark> latency_mark;
std::exception_ptr eptr;
try {
if (f.failed()) {
return make_exception_future<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(f).get_exception());
std::rethrow_exception(std::move(f).get_exception());
}
result_with_foreign_response_ptr res = std::get<result_with_foreign_response_ptr>(f.get());
auto ret = f.get();
latency_mark = std::move(ret.latency_mark);
result_with_foreign_response_ptr res = std::get<result_with_foreign_response_ptr>(std::move(ret.result));
if (!res) {
return std::move(res).assume_error().as_exception_future<foreign_ptr<std::unique_ptr<cql_server::response>>>();
std::move(res).assume_error().throw_me();
}
auto response = std::move(res).assume_value();
@@ -1035,7 +1061,7 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
case auth_state::AUTHENTICATION:
// Support both SASL auth from protocol v2 and the older style Credentials auth from v1
if (cqlop != cql_binary_opcode::AUTH_RESPONSE && cqlop != cql_binary_opcode::CREDENTIALS) {
return make_exception_future<foreign_ptr<std::unique_ptr<cql_server::response>>>(exceptions::protocol_exception(format("Unexpected message {:d}, expecting AUTH_RESPONSE or CREDENTIALS", int(cqlop))));
throw exceptions::protocol_exception(format("Unexpected message {:d}, expecting AUTH_RESPONSE or CREDENTIALS", int(cqlop)));
}
if (res_op == cql_binary_opcode::READY || res_op == cql_binary_opcode::AUTH_SUCCESS) {
client_state.set_auth_state(auth_state::READY);
@@ -1048,15 +1074,17 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
tracing::set_response_size(trace_state, response->size());
cql_stats.response_size.add(response->size());
return make_ready_future<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(response));
}).handle_exception([this, stream, &client_state, trace_state] (std::exception_ptr eptr) {
auto response = _server.handle_exception(stream, eptr, trace_state, _version, client_state);
if (auto timeout = _server.timeout_for_sleep(eptr)) {
// Return read timeout exception, as we wait here until the timeout passes
return _server.sleep_until_timeout_passes(*timeout, std::move(response));
}
return utils::result_into_future<result_with_foreign_response_ptr>(std::move(response));
});
co_return response_with_latency{std::move(response), std::move(latency_mark)};
} catch (...) {
eptr = std::current_exception();
}
// Handle exception outside catch block so co_await is allowed.
auto response = _server.handle_exception(stream, eptr, trace_state, _version, client_state);
if (auto timeout = _server.timeout_for_sleep(eptr)) {
auto resp = co_await _server.sleep_until_timeout_passes(*timeout, make_foreign(std::move(response)));
co_return response_with_latency{std::move(resp), std::move(latency_mark)};
}
co_return response_with_latency{make_foreign(std::move(response)), std::move(latency_mark)};
});
}
@@ -1263,14 +1291,15 @@ future<> cql_server::connection::process_request() {
op == uint8_t (cql_binary_opcode::EXECUTE) ||
op == uint8_t(cql_binary_opcode::BATCH));
future<foreign_ptr<std::unique_ptr<cql_server::response>>> request_process_future = should_paralelize ?
future<response_with_latency> request_process_future = should_paralelize ?
_process_request_stage(this, istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit) :
process_request_one(istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit);
future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> response_f) mutable {
future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream] (future<response_with_latency> response_f) mutable {
try {
auto& sg_stats = _server.get_cql_sg_stats();
size_t pending_response_size = 0;
std::optional<service::deferred_latency_mark> latency_mark;
if (response_f.failed()) {
const auto message = format("request processing failed, error [{}]", response_f.get_exception());
clogger.error("{}: {}", _client_state.get_remote_address(), message);
@@ -1278,7 +1307,9 @@ future<> cql_server::connection::process_request() {
message,
tracing::trace_state_ptr()));
} else {
auto response = response_f.get();
auto result = response_f.get();
latency_mark = std::move(result.latency_mark);
auto response = std::move(result.response);
// Account for response body size exceeding the initial estimate.
auto resp_size = response->size();
auto permit_size = mem_permit.count();
@@ -1291,8 +1322,13 @@ future<> cql_server::connection::process_request() {
sg_stats._pending_response_memory += pending_response_size;
write_response(std::move(response), _compression);
}
_ready_to_respond = _ready_to_respond.finally([leave = std::move(leave), permit = std::move(mem_permit), &sg_stats, pending_response_size] {
_ready_to_respond = _ready_to_respond.finally([leave = std::move(leave), permit = std::move(mem_permit), &sg_stats, pending_response_size, latency_mark = std::move(latency_mark)] () mutable {
sg_stats._pending_response_memory -= pending_response_size;
// Stop the latency counter and mark the histogram now that
// the response has been flushed to the OS socket.
if (latency_mark && latency_mark->histogram) {
latency_mark->histogram->mark(latency_mark->lc.stop().latency());
}
});
} catch (...) {
clogger.error("{}: request processing failed: {}",
@@ -1531,6 +1567,7 @@ process_query_internal(service::client_state& client_state, sharded<cql3::query_
}
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
auto& query_state = q_state->query_state;
query_state.start_latency();
auto o = in.read_options(version, qp.local().get_cql_config());
if (!o) {
return make_exception_future<cql_server::process_fn_return_type>(std::move(o).assume_error());
@@ -1552,14 +1589,15 @@ process_query_internal(service::client_state& client_state, sharded<cql3::query_
}
return qp.local().execute_direct_without_checking_exception_message(query.assume_value(), query_state, dialect, options).then([q_state = std::move(q_state), stream, skip_metadata, version] (auto msg) {
auto latency_mark = q_state->query_state.take_deferred_latency();
if (msg->as_bounce()) {
return cql_server::process_fn_return_type(make_foreign(static_pointer_cast<messages::result_message::bounce>(msg)));
return cql_server::process_fn_return_type(make_foreign(static_pointer_cast<messages::result_message::bounce>(msg)), std::move(latency_mark));
} else if (msg->is_exception()) {
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()), std::move(latency_mark));
} else {
tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, cql_metadata_id_wrapper{}, skip_metadata)));
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, cql_metadata_id_wrapper{}, skip_metadata)), std::move(latency_mark));
}
});
}
@@ -1629,6 +1667,7 @@ process_execute_internal(service::client_state& client_state, sharded<cql3::quer
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
auto& query_state = q_state->query_state;
query_state.start_latency();
auto o = in.read_options(version, qp.local().get_cql_config());
if (!o) {
return make_exception_future<cql_server::process_fn_return_type>(std::move(o).assume_error());
@@ -1670,13 +1709,14 @@ process_execute_internal(service::client_state& client_state, sharded<cql3::quer
tracing::trace(trace_state, "Processing a statement");
return qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, std::move(prepared), std::move(cache_key), needs_authorization)
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version, metadata_id = std::move(metadata_id)] (auto msg) mutable {
auto latency_mark = q_state->query_state.take_deferred_latency();
if (msg->as_bounce()) {
return cql_server::process_fn_return_type(make_foreign(static_pointer_cast<messages::result_message::bounce>(msg)));
return cql_server::process_fn_return_type(make_foreign(static_pointer_cast<messages::result_message::bounce>(msg)), std::move(latency_mark));
} else if (msg->is_exception()) {
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()), std::move(latency_mark));
} else {
tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, std::move(metadata_id), skip_metadata)));
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, std::move(metadata_id), skip_metadata)), std::move(latency_mark));
}
});
}
@@ -1787,6 +1827,7 @@ process_batch_internal(service::client_state& client_state, sharded<cql3::query_
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
auto& query_state = q_state->query_state;
query_state.start_latency();
// #563. CQL v2 encodes query_options in v1 format for batch requests.
auto o = in.read_options(version, qp.local().get_cql_config());
if (!o) {
@@ -1810,14 +1851,15 @@ process_batch_internal(service::client_state& client_state, sharded<cql3::query_
batch->set_audit_info(batch->audit_info());
return qp.local().execute_batch_without_checking_exception_message(batch, query_state, options, std::move(pending_authorization_entries))
.then([stream, batch, q_state = std::move(q_state), trace_state = query_state.get_trace_state(), version] (auto msg) {
auto latency_mark = q_state->query_state.take_deferred_latency();
if (msg->as_bounce()) {
return cql_server::process_fn_return_type(make_foreign(static_pointer_cast<messages::result_message::bounce>(msg)));
return cql_server::process_fn_return_type(make_foreign(static_pointer_cast<messages::result_message::bounce>(msg)), std::move(latency_mark));
} else if (msg->is_exception()) {
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()), std::move(latency_mark));
} else {
tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, trace_state, version, cql_metadata_id_wrapper{})));
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, trace_state, version, cql_metadata_id_wrapper{})), std::move(latency_mark));
}
});
}
@@ -1859,10 +1901,10 @@ cql_server::process(uint16_t stream, request_reader in, service::client_state& c
bool init_trace = (bool)!bounced; // If the request was bounced, we already started the trace in the handler
auto msg = co_await coroutine::try_future(process_fn(client_state, _query_processor, in, stream,
version, permit, trace_state, init_trace, {}, dialect));
while (auto* bounce_msg = std::get_if<cql_server::result_with_bounce>(&msg)) {
version, permit, trace_state, init_trace, std::move(cached_fn_calls), dialect));
while (auto* bounce_msg = std::get_if<cql_server::result_with_bounce>(&msg.result)) {
auto shard = (*bounce_msg)->target_shard();
auto&& cached_vals = (*bounce_msg)->take_cached_pk_function_calls();
auto cached_vals = (*bounce_msg)->take_cached_pk_function_calls();
auto target_host = (*bounce_msg)->target_host();
auto my_host_id = _query_processor.local().proxy().get_token_metadata_ptr()->get_topology().my_host_id();
if (target_host == my_host_id) {
@@ -1870,13 +1912,25 @@ cql_server::process(uint16_t stream, request_reader in, service::client_state& c
auto sg = _config.bounce_request_smp_service_group;
auto gcs = client_state.move_to_other_shard();
auto gt = tracing::global_trace_state_ptr(trace_state);
msg = co_await container().invoke_on(shard, sg, [&, stream, dialect, version] (cql_server& server) -> future<process_fn_return_type> {
msg = co_await container().invoke_on(shard, sg, [&, stream, dialect, version, cached_vals = std::move(cached_vals)] (cql_server& server) mutable -> future<process_fn_return_type> {
bytes_ostream linearization_buffer;
request_reader in(is, linearization_buffer);
auto local_client_state = gcs.get(&server._abort_source);
auto local_trace_state = gt.get();
co_return co_await process_fn(local_client_state, server._query_processor, in, stream, version,
/* FIXME */empty_service_permit(), std::move(local_trace_state), false, cached_vals, dialect);
auto ret = co_await process_fn(local_client_state, server._query_processor, in, stream, version,
/* FIXME */empty_service_permit(), std::move(local_trace_state), false, std::move(cached_vals), dialect);
// Mark latency on the target shard before returning.
// The histogram pointer belongs to this shard's stats and
// must not be dereferenced from another shard.
// Only mark when the result is not a bounce — a bounce means
// the request will be retried elsewhere and no real work was done.
if (!std::get_if<cql_server::result_with_bounce>(&ret.result)) {
if (ret.latency_mark && ret.latency_mark->histogram) {
ret.latency_mark->histogram->mark(ret.latency_mark->lc.stop().latency());
}
}
ret.latency_mark.reset();
co_return ret;
});
} else {
// Node bounce
@@ -1897,7 +1951,7 @@ cql_server::process(uint16_t stream, request_reader in, service::client_state& c
.dialect = dialect,
.client_state = client_state,
.trace_info = tracing::make_trace_info(trace_state),
.cached_fn_calls = std::move(cached_fn_calls),
.cached_fn_calls = std::move(cached_vals),
};
auto response = co_await forward_cql(

View File

@@ -250,7 +250,17 @@ public:
using response = cql_transport::response;
using result_with_foreign_response_ptr = exceptions::coordinator_result<foreign_ptr<std::unique_ptr<cql_server::response>>>;
using result_with_bounce = foreign_ptr<seastar::shared_ptr<messages::result_message::bounce>>;
using process_fn_return_type = std::variant<result_with_foreign_response_ptr, result_with_bounce>;
using process_fn_result = std::variant<result_with_foreign_response_ptr, result_with_bounce>;
struct process_fn_return_type {
process_fn_result result;
std::optional<service::deferred_latency_mark> latency_mark;
process_fn_return_type(result_with_foreign_response_ptr r, std::optional<service::deferred_latency_mark> lm = std::nullopt);
process_fn_return_type(result_with_bounce r, std::optional<service::deferred_latency_mark> lm = std::nullopt);
process_fn_return_type(process_fn_return_type&&) noexcept;
process_fn_return_type& operator=(process_fn_return_type&&) noexcept;
~process_fn_return_type();
};
service::endpoint_lifecycle_subscriber* get_lifecycle_listener() const noexcept;
service::migration_listener* get_migration_listener() const noexcept;
@@ -287,6 +297,11 @@ private:
std::optional<seastar::lowres_clock::time_point> timeout_for_sleep(std::exception_ptr eptr) const;
future<foreign_ptr<std::unique_ptr<cql_server::response>>> sleep_until_timeout_passes(const seastar::lowres_clock::time_point& timeout, foreign_ptr<std::unique_ptr<cql_server::response>>&& resp);
struct response_with_latency {
foreign_ptr<std::unique_ptr<cql_server::response>> response;
std::optional<service::deferred_latency_mark> latency_mark;
};
class connection : public generic_server::connection {
cql_server& _server;
socket_address _server_addr;
@@ -308,7 +323,7 @@ private:
};
private:
using execution_stage_type = inheriting_concrete_execution_stage<
future<foreign_ptr<std::unique_ptr<cql_server::response>>>,
future<response_with_latency>,
cql_server::connection*,
fragmented_temporary_buffer::istream,
uint8_t,
@@ -330,7 +345,7 @@ private:
private:
friend class process_request_executor;
future<foreign_ptr<std::unique_ptr<cql_server::response>>> process_request_one(fragmented_temporary_buffer::istream buf, uint8_t op, uint16_t stream, service::client_state& client_state, tracing_request_type tracing_request, service_permit permit);
future<response_with_latency> process_request_one(fragmented_temporary_buffer::istream buf, uint8_t op, uint16_t stream, service::client_state& client_state, tracing_request_type tracing_request, service_permit permit);
unsigned frame_size() const;
unsigned pick_request_cpu();
utils::result_with_exception<cql_binary_frame_v3, exceptions::protocol_exception, class cql_frame_error> parse_frame(temporary_buffer<char> buf) const;