mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
Compare commits
1 Commits
master
...
ykaul/late
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4d44ee3bb4 |
@@ -275,9 +275,12 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::do_
|
||||
_statements[i].statement->restrictions().validate_primary_key(options.for_statement(i));
|
||||
}
|
||||
|
||||
// Set the histogram for deferred latency marking.
|
||||
auto& stats = qp.proxy().get_stats();
|
||||
if (_has_conditions) {
|
||||
++_stats.cas_batches;
|
||||
_stats.statements_in_cas_batches += _statements.size();
|
||||
query_state.set_latency_histogram(stats.cas_write);
|
||||
return execute_with_conditions(qp, options, query_state).then([guardrail_state, cl] (auto result) {
|
||||
if (guardrail_state == query_processor::write_consistency_guardrail_state::WARN) {
|
||||
result->add_warning(format("Using write consistency level {} listed on the "
|
||||
@@ -290,10 +293,13 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::do_
|
||||
++_stats.batches;
|
||||
_stats.statements_in_batches += _statements.size();
|
||||
|
||||
query_state.set_latency_histogram(stats.write);
|
||||
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(query_state.get_client_state(), options);
|
||||
auto defer_latency = query_state.has_deferred_latency();
|
||||
return get_mutations(qp, options, timeout, local, now, query_state).then([this, &qp, cl, timeout, tr_state = query_state.get_trace_state(),
|
||||
permit = query_state.get_permit()] (utils::chunked_vector<mutation> ms) mutable {
|
||||
return execute_without_conditions(qp, std::move(ms), cl, timeout, std::move(tr_state), std::move(permit));
|
||||
permit = query_state.get_permit(), defer_latency] (utils::chunked_vector<mutation> ms) mutable {
|
||||
return execute_without_conditions(qp, std::move(ms), cl, timeout, std::move(tr_state), std::move(permit), defer_latency);
|
||||
}).then([guardrail_state, cl] (coordinator_result<> res) {
|
||||
if (!res) {
|
||||
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(
|
||||
@@ -314,7 +320,8 @@ future<coordinator_result<>> batch_statement::execute_without_conditions(
|
||||
db::consistency_level cl,
|
||||
db::timeout_clock::time_point timeout,
|
||||
tracing::trace_state_ptr tr_state,
|
||||
service_permit permit) const
|
||||
service_permit permit,
|
||||
bool defer_coordinator_latency_mark) const
|
||||
{
|
||||
// FIXME: do we need to do this?
|
||||
#if 0
|
||||
@@ -341,7 +348,9 @@ future<coordinator_result<>> batch_statement::execute_without_conditions(
|
||||
mutate_atomic = false;
|
||||
}
|
||||
}
|
||||
return qp.proxy().mutate_with_triggers(std::move(mutations), cl, timeout, mutate_atomic, std::move(tr_state), std::move(permit), db::allow_per_partition_rate_limit::yes);
|
||||
return qp.proxy().mutate_with_triggers(std::move(mutations), cl, timeout, mutate_atomic, std::move(tr_state), std::move(permit), db::allow_per_partition_rate_limit::yes, false, {
|
||||
.defer_coordinator_latency_mark = defer_coordinator_latency_mark,
|
||||
});
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::messages::result_message>> batch_statement::execute_with_conditions(
|
||||
@@ -402,7 +411,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
|
||||
|
||||
auto* request_ptr = request.get();
|
||||
return qp.proxy().cas(schema, std::move(cas_shard), *request_ptr, request->read_command(qp), request->key(),
|
||||
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()},
|
||||
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state(), {}, {}, service::node_local_only::no, qs.has_deferred_latency()},
|
||||
std::move(cl_for_paxos).assume_value(), cl_for_learn, batch_timeout, cas_timeout).then([this, request = std::move(request)] (bool is_applied) {
|
||||
return request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied);
|
||||
});
|
||||
@@ -490,4 +499,3 @@ audit::statement_category batch_statement::category() const {
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -140,7 +140,8 @@ private:
|
||||
db::consistency_level cl,
|
||||
db::timeout_clock::time_point timeout,
|
||||
tracing::trace_state_ptr tr_state,
|
||||
service_permit permit) const;
|
||||
service_permit permit,
|
||||
bool defer_coordinator_latency_mark) const;
|
||||
|
||||
future<shared_ptr<cql_transport::messages::result_message>> execute_with_conditions(
|
||||
query_processor& qp,
|
||||
|
||||
@@ -281,7 +281,10 @@ modification_statement::do_execute(query_processor& qp, service::query_state& qs
|
||||
|
||||
_restrictions->validate_primary_key(options);
|
||||
|
||||
// Set the histogram for deferred latency marking.
|
||||
auto& stats = qp.proxy().get_stats();
|
||||
if (has_conditions()) {
|
||||
qs.set_latency_histogram(stats.cas_write);
|
||||
auto result = co_await execute_with_condition(qp, qs, options);
|
||||
if (guardrail_state == query_processor::write_consistency_guardrail_state::WARN) {
|
||||
result->add_warning(format("Using write consistency level {} listed on the "
|
||||
@@ -290,6 +293,8 @@ modification_statement::do_execute(query_processor& qp, service::query_state& qs
|
||||
co_return result;
|
||||
}
|
||||
|
||||
qs.set_latency_histogram(stats.write);
|
||||
|
||||
json_cache_opt json_cache = maybe_prepare_json_cache(options);
|
||||
std::vector<dht::partition_range> keys = build_partition_keys(options, json_cache);
|
||||
|
||||
@@ -334,7 +339,8 @@ modification_statement::execute_without_condition(query_processor& qp, service::
|
||||
}
|
||||
|
||||
return qp.proxy().mutate_with_triggers(std::move(mutations), cl, timeout, false, qs.get_trace_state(), qs.get_permit(), db::allow_per_partition_rate_limit::yes, this->is_raw_counter_shard_write(), {
|
||||
.node_local_only = options.get_specific_options().node_local_only
|
||||
.node_local_only = options.get_specific_options().node_local_only,
|
||||
.defer_coordinator_latency_mark = qs.has_deferred_latency(),
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -450,7 +456,7 @@ modification_statement::execute_with_condition(query_processor& qp, service::que
|
||||
}
|
||||
|
||||
return qp.proxy().cas(s, std::move(cas_shard), *request_ptr, request->read_command(qp), request->key(),
|
||||
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()},
|
||||
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state(), {}, {}, service::node_local_only::no, qs.has_deferred_latency()},
|
||||
std::move(cl_for_paxos).assume_value(), cl_for_learn, statement_timeout, cas_timeout).then([this, request = std::move(request), tablet_replicas = std::move(tablet_info->tablet_replicas), token_range = tablet_info->token_range] (bool is_applied) {
|
||||
auto result = request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied);
|
||||
result->add_tablet_info(tablet_replicas, token_range);
|
||||
|
||||
@@ -482,6 +482,19 @@ select_statement::do_execute(query_processor& qp,
|
||||
|
||||
auto key_ranges = _restrictions->get_partition_key_ranges(options);
|
||||
|
||||
// Set the histogram for deferred latency marking.
|
||||
// Serial consistency reads go through paxos (cas_read histogram),
|
||||
// non-serial reads use read or range histograms.
|
||||
{
|
||||
auto& stats = qp.proxy().get_stats();
|
||||
if (db::is_serial_consistency(options.get_consistency())) {
|
||||
state.set_latency_histogram(stats.cas_read);
|
||||
} else {
|
||||
bool is_range = key_ranges.empty() || !query::is_single_partition(key_ranges.front());
|
||||
state.set_latency_histogram(is_range ? stats.range : stats.read);
|
||||
}
|
||||
}
|
||||
|
||||
auto token = dht::token();
|
||||
std::optional<locator::tablet_routing_info> tablet_info = {};
|
||||
|
||||
@@ -765,7 +778,7 @@ view_indexed_table_select_statement::do_execute_base_query(
|
||||
if (previous_result_size < query::result_memory_limiter::maximum_result_size && concurrency < max_base_table_query_concurrency) {
|
||||
concurrency *= 2;
|
||||
}
|
||||
coordinator_result<service::storage_proxy::coordinator_query_result> rqr = co_await qp.proxy().query_result(_schema, command, std::move(prange), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()});
|
||||
coordinator_result<service::storage_proxy::coordinator_query_result> rqr = co_await qp.proxy().query_result(_schema, command, std::move(prange), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, service::node_local_only::no, state.has_deferred_latency()});
|
||||
if (!rqr.has_value()) {
|
||||
co_return std::move(rqr).as_failure();
|
||||
}
|
||||
@@ -837,7 +850,7 @@ view_indexed_table_select_statement::do_execute_base_query(
|
||||
command->slice._row_ranges.push_back(query::clustering_range::make_singular(key.clustering));
|
||||
}
|
||||
coordinator_result<service::storage_proxy::coordinator_query_result> rqr
|
||||
= co_await qp.proxy().query_result(_schema, command, {dht::partition_range::make_singular(key.partition)}, options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()});
|
||||
= co_await qp.proxy().query_result(_schema, command, {dht::partition_range::make_singular(key.partition)}, options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, service::node_local_only::no, state.has_deferred_latency()});
|
||||
if (!rqr.has_value()) {
|
||||
co_return std::move(rqr).as_failure();
|
||||
}
|
||||
@@ -912,7 +925,7 @@ select_statement::execute_without_checking_exception_message_non_aggregate_unpag
|
||||
command,
|
||||
std::move(prange),
|
||||
options.get_consistency(),
|
||||
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, options.get_specific_options().node_local_only},
|
||||
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, options.get_specific_options().node_local_only, state.has_deferred_latency()},
|
||||
cas_shard).then(utils::result_wrap([] (service::storage_proxy::coordinator_query_result qr) {
|
||||
return make_ready_future<coordinator_result<foreign_ptr<lw_shared_ptr<query::result>>>>(std::move(qr.query_result));
|
||||
}));
|
||||
@@ -921,7 +934,7 @@ select_statement::execute_without_checking_exception_message_non_aggregate_unpag
|
||||
return this->process_results(std::move(result), cmd, options, now);
|
||||
}));
|
||||
} else {
|
||||
return qp.proxy().query_result(_query_schema, cmd, std::move(partition_ranges), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, options.get_specific_options().node_local_only}, std::move(cas_shard))
|
||||
return qp.proxy().query_result(_query_schema, cmd, std::move(partition_ranges), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, options.get_specific_options().node_local_only, state.has_deferred_latency()}, std::move(cas_shard))
|
||||
.then(wrap_result_to_error_message([this, &options, now, cmd] (service::storage_proxy::coordinator_query_result qr) {
|
||||
return this->process_results(std::move(qr.query_result), cmd, options, now);
|
||||
}));
|
||||
@@ -1199,6 +1212,10 @@ view_indexed_table_select_statement::actually_do_execute(query_processor& qp,
|
||||
|
||||
validate_for_read(cl);
|
||||
|
||||
// Secondary index reads always go through proxy().query_result().
|
||||
// Mark as read since these are always single-partition lookups by primary key.
|
||||
state.set_latency_histogram(qp.proxy().get_stats().read);
|
||||
|
||||
auto now = gc_clock::now();
|
||||
|
||||
++_stats.secondary_index_reads;
|
||||
@@ -1431,7 +1448,7 @@ view_indexed_table_select_statement::read_posting_list(query_processor& qp,
|
||||
|
||||
int32_t page_size = options.get_page_size();
|
||||
if (page_size <= 0 || !service::pager::query_pagers::may_need_paging(*_view_schema, page_size, *cmd, partition_ranges)) {
|
||||
return qp.proxy().query_result(_view_schema, cmd, std::move(partition_ranges), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()})
|
||||
return qp.proxy().query_result(_view_schema, cmd, std::move(partition_ranges), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, service::node_local_only::no, state.has_deferred_latency()})
|
||||
.then(utils::result_wrap([this, now, &options, selection = std::move(selection), partition_slice = std::move(partition_slice)] (service::storage_proxy::coordinator_query_result qr)
|
||||
-> coordinator_result<::shared_ptr<cql_transport::messages::result_message::rows>> {
|
||||
cql3::selection::result_set_builder builder(*selection, now, &options);
|
||||
@@ -1883,7 +1900,7 @@ mutation_fragments_select_statement::do_execute(query_processor& qp, service::qu
|
||||
|| !service::pager::query_pagers::may_need_paging(*_schema, page_size,
|
||||
*command, key_ranges))) {
|
||||
return do_query(erm_keepalive, {}, qp.proxy(), _schema, command, std::move(key_ranges), cl,
|
||||
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}})
|
||||
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, service::node_local_only::no, state.has_deferred_latency()})
|
||||
.then(wrap_result_to_error_message([this, erm_keepalive, now, &options, slice = command->slice] (service::storage_proxy_coordinator_query_result&& qr) mutable {
|
||||
cql3::selection::result_set_builder builder(*_selection, now, &options);
|
||||
query::result_view::consume(*qr.query_result, std::move(slice),
|
||||
@@ -2116,6 +2133,9 @@ future<shared_ptr<cql_transport::messages::result_message>> vector_indexed_table
|
||||
tracing::add_table_name(state.get_trace_state(), keyspace(), column_family());
|
||||
validate_for_read(options.get_consistency());
|
||||
|
||||
// Vector index reads go through proxy().query_result() with single-partition lookups.
|
||||
state.set_latency_histogram(qp.proxy().get_stats().read);
|
||||
|
||||
_query_start_time_point = gc_clock::now();
|
||||
|
||||
update_stats();
|
||||
@@ -2208,7 +2228,7 @@ future<::shared_ptr<cql_transport::messages::result_message>> vector_indexed_tab
|
||||
cmd->slice._row_ranges = query::clustering_row_ranges{query::clustering_range::make_singular(key.clustering)};
|
||||
coordinator_result<service::storage_proxy::coordinator_query_result> rqr =
|
||||
co_await qp.proxy().query_result(_schema, cmd, {dht::partition_range::make_singular(key.partition)}, options.get_consistency(),
|
||||
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()});
|
||||
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, service::node_local_only::no, state.has_deferred_latency()});
|
||||
if (!rqr) {
|
||||
co_return std::move(rqr).as_failure();
|
||||
}
|
||||
@@ -2228,7 +2248,7 @@ future<::shared_ptr<cql_transport::messages::result_message>> vector_indexed_tab
|
||||
|
||||
co_return co_await qp.proxy()
|
||||
.query_result(_query_schema, command, std::move(partition_ranges), options.get_consistency(),
|
||||
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, options.get_specific_options().node_local_only},
|
||||
{timeout, state.get_permit(), state.get_client_state(), state.get_trace_state(), {}, {}, options.get_specific_options().node_local_only, state.has_deferred_latency()},
|
||||
std::nullopt)
|
||||
.then(wrap_result_to_error_message([this, &options, command](service::storage_proxy::coordinator_query_result qr) {
|
||||
command->set_row_limit(get_limit(options, _limit));
|
||||
|
||||
@@ -197,7 +197,7 @@ future<result<service::storage_proxy::coordinator_query_result>> query_pager::do
|
||||
std::move(command),
|
||||
std::move(ranges),
|
||||
_options.get_consistency(),
|
||||
{timeout, _state.get_permit(), _state.get_client_state(), _state.get_trace_state(), std::move(_last_replicas), _query_read_repair_decision, _options.get_specific_options().node_local_only},
|
||||
{timeout, _state.get_permit(), _state.get_client_state(), _state.get_trace_state(), std::move(_last_replicas), _query_read_repair_decision, _options.get_specific_options().node_local_only, _state.has_deferred_latency()},
|
||||
std::move(cas_shard));
|
||||
}
|
||||
|
||||
|
||||
@@ -14,17 +14,35 @@
|
||||
#include "tracing/tracing.hh"
|
||||
#include "tracing/trace_state.hh"
|
||||
#include "service_permit.hh"
|
||||
#include "utils/latency.hh"
|
||||
|
||||
namespace utils {
|
||||
class timed_rate_moving_average_summary_and_histogram;
|
||||
}
|
||||
|
||||
namespace qos {
|
||||
class service_level_controller;
|
||||
}
|
||||
namespace service {
|
||||
|
||||
// Carries a started latency counter and a pointer to the histogram
|
||||
// that should be marked when the response is flushed to the client.
|
||||
// The counter is started at the transport layer when the request
|
||||
// arrives and the histogram pointer is set by the statement layer
|
||||
// once the operation type (read/write/range/cas) is known.
|
||||
// The transport layer stops the counter and marks the histogram
|
||||
// after the response has been flushed to the OS socket.
|
||||
struct deferred_latency_mark {
|
||||
utils::latency_counter lc;
|
||||
utils::timed_rate_moving_average_summary_and_histogram* histogram = nullptr;
|
||||
};
|
||||
|
||||
class query_state final {
|
||||
private:
|
||||
client_state& _client_state;
|
||||
tracing::trace_state_ptr _trace_state_ptr;
|
||||
service_permit _permit;
|
||||
std::optional<deferred_latency_mark> _deferred_latency;
|
||||
|
||||
public:
|
||||
query_state(client_state& client_state, service_permit permit)
|
||||
@@ -70,6 +88,33 @@ public:
|
||||
return _client_state.get_service_level_controller();
|
||||
}
|
||||
|
||||
// Start the latency counter. Called from the transport layer
|
||||
// when the request first arrives.
|
||||
void start_latency() {
|
||||
_deferred_latency.emplace();
|
||||
_deferred_latency->lc.start();
|
||||
}
|
||||
|
||||
// Set the histogram that should be marked when the response
|
||||
// is flushed. Called from the statement layer once the
|
||||
// operation type is known.
|
||||
void set_latency_histogram(utils::timed_rate_moving_average_summary_and_histogram& hist) {
|
||||
if (_deferred_latency) {
|
||||
_deferred_latency->histogram = &hist;
|
||||
}
|
||||
}
|
||||
|
||||
bool has_deferred_latency() const {
|
||||
return _deferred_latency.has_value();
|
||||
}
|
||||
|
||||
// Extract the deferred latency mark. The transport layer
|
||||
// calls this to take ownership and mark the histogram after
|
||||
// the response is flushed.
|
||||
std::optional<deferred_latency_mark> take_deferred_latency() {
|
||||
return std::exchange(_deferred_latency, std::nullopt);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -3979,9 +3979,8 @@ future<result<>> storage_proxy::mutate_begin(unique_response_handler_vector ids,
|
||||
|
||||
// this function should be called with a future that holds result of mutation attempt (usually
|
||||
// future returned by mutate_begin()). The future should be ready when function is called.
|
||||
future<result<>> storage_proxy::mutate_end(future<result<>> mutate_result, utils::latency_counter lc, write_stats& stats, tracing::trace_state_ptr trace_state) {
|
||||
future<result<>> storage_proxy::mutate_end(future<result<>> mutate_result, write_stats& stats, tracing::trace_state_ptr trace_state) {
|
||||
SCYLLA_ASSERT(mutate_result.available());
|
||||
stats.write.mark(lc.stop().latency());
|
||||
|
||||
return utils::result_futurize_try([&] {
|
||||
auto&& res = mutate_result.get();
|
||||
@@ -4208,14 +4207,27 @@ future<> storage_proxy::mutate(utils::chunked_vector<mutation> mutations, db::co
|
||||
}
|
||||
|
||||
future<result<>> storage_proxy::mutate_result(utils::chunked_vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, bool raw_counters, coordinator_mutate_options options) {
|
||||
std::optional<utils::latency_counter> lc;
|
||||
if (!options.defer_coordinator_latency_mark) {
|
||||
lc.emplace();
|
||||
lc->start();
|
||||
}
|
||||
if (_cdc && _cdc->needs_cdc_augmentation(mutations)) {
|
||||
return _cdc->augment_mutation_call(timeout, std::move(mutations), tr_state, cl, std::move(options.cdc_options)).then([this, cl, timeout, tr_state, permit = std::move(permit), raw_counters, cdc = _cdc->shared_from_this(), allow_limit, options = std::move(options)](std::tuple<utils::chunked_vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>&& t) mutable {
|
||||
return _cdc->augment_mutation_call(timeout, std::move(mutations), tr_state, cl, std::move(options.cdc_options)).then([this, cl, timeout, tr_state, permit = std::move(permit), raw_counters, cdc = _cdc->shared_from_this(), allow_limit, options = std::move(options), lc = std::move(lc)](std::tuple<utils::chunked_vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>&& t) mutable {
|
||||
auto mutations = std::move(std::get<0>(t));
|
||||
auto tracker = std::move(std::get<1>(t));
|
||||
return _mutate_stage(this, std::move(mutations), cl, timeout, std::move(tr_state), std::move(permit), raw_counters, allow_limit, std::move(tracker), std::move(options));
|
||||
return _mutate_stage(this, std::move(mutations), cl, timeout, std::move(tr_state), std::move(permit), raw_counters, allow_limit, std::move(tracker), std::move(options)).finally([this, lc = std::move(lc)] () mutable {
|
||||
if (lc) {
|
||||
get_stats().write.mark(lc->stop().latency());
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
return _mutate_stage(this, std::move(mutations), cl, timeout, std::move(tr_state), std::move(permit), raw_counters, allow_limit, nullptr, std::move(options));
|
||||
return _mutate_stage(this, std::move(mutations), cl, timeout, std::move(tr_state), std::move(permit), raw_counters, allow_limit, nullptr, std::move(options)).finally([this, lc = std::move(lc)] () mutable {
|
||||
if (lc) {
|
||||
get_stats().write.mark(lc->stop().latency());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<result<>> storage_proxy::do_mutate(utils::chunked_vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, bool raw_counters, db::allow_per_partition_rate_limit allow_limit, lw_shared_ptr<cdc::operation_result_tracker> cdc_tracker, coordinator_mutate_options options) {
|
||||
@@ -4262,15 +4274,12 @@ storage_proxy::mutate_internal(Range mutations, db::consistency_level cl, tracin
|
||||
// special handling, e.g. counters. otherwise, a default type is used.
|
||||
auto type = type_opt.value_or(std::next(std::begin(mutations)) == std::end(mutations) ? db::write_type::SIMPLE : db::write_type::UNLOGGED_BATCH);
|
||||
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
|
||||
return mutate_prepare(mutations, cl, type, tr_state, std::move(permit), allow_limit, std::move(options)).then(utils::result_wrap([this, cl, timeout_opt, tracker = std::move(cdc_tracker),
|
||||
tr_state] (storage_proxy::unique_response_handler_vector ids) mutable {
|
||||
register_cdc_operation_result_tracker(ids, tracker);
|
||||
return mutate_begin(std::move(ids), cl, tr_state, timeout_opt);
|
||||
})).then_wrapped([this, p = shared_from_this(), lc, tr_state] (future<result<>> f) mutable {
|
||||
return p->mutate_end(std::move(f), lc, get_stats(), std::move(tr_state));
|
||||
})).then_wrapped([this, p = shared_from_this(), tr_state] (future<result<>> f) mutable {
|
||||
return p->mutate_end(std::move(f), get_stats(), std::move(tr_state));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -4379,8 +4388,6 @@ static host_id_vector_replica_set endpoint_filter(
|
||||
|
||||
future<result<>>
|
||||
storage_proxy::mutate_atomically_result(utils::chunked_vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, coordinator_mutate_options options) {
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
|
||||
class context {
|
||||
storage_proxy& _p;
|
||||
@@ -4494,8 +4501,8 @@ storage_proxy::mutate_atomically_result(utils::chunked_vector<mutation> mutation
|
||||
return make_exception_future<lw_shared_ptr<context>>(std::current_exception());
|
||||
}
|
||||
};
|
||||
auto cleanup = [p = shared_from_this(), lc, tr_state] (future<result<>> f) mutable {
|
||||
return p->mutate_end(std::move(f), lc, p->get_stats(), std::move(tr_state));
|
||||
auto cleanup = [p = shared_from_this(), tr_state] (future<result<>> f) mutable {
|
||||
return p->mutate_end(std::move(f), p->get_stats(), std::move(tr_state));
|
||||
};
|
||||
|
||||
if (_cdc && _cdc->needs_cdc_augmentation(mutations)) {
|
||||
@@ -4532,9 +4539,6 @@ future<> storage_proxy::send_to_endpoint(
|
||||
write_stats& stats,
|
||||
allow_hints allow_hints,
|
||||
is_cancellable cancellable) {
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
|
||||
std::optional<clock_type::time_point> timeout;
|
||||
db::consistency_level cl = allow_hints ? db::consistency_level::ANY : db::consistency_level::ONE;
|
||||
if (type == db::write_type::VIEW) {
|
||||
@@ -4543,6 +4547,9 @@ future<> storage_proxy::send_to_endpoint(
|
||||
timeout = clock_type::now() + 5min;
|
||||
}
|
||||
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
|
||||
return mutate_prepare(std::array{std::move(m)},
|
||||
[this, tr_state, erm = std::move(ermp), target = std::array{target}, pending_endpoints, &stats, cancellable, cl, type, /* does view building should hold a real permit */ permit = empty_service_permit()] (std::unique_ptr<mutation_holder>& m) mutable {
|
||||
host_id_vector_replica_set targets;
|
||||
@@ -4570,8 +4577,11 @@ future<> storage_proxy::send_to_endpoint(
|
||||
cancellable);
|
||||
}).then(utils::result_wrap([this, cl, tr_state = std::move(tr_state), timeout = std::move(timeout)] (unique_response_handler_vector ids) mutable {
|
||||
return mutate_begin(std::move(ids), cl, std::move(tr_state), std::move(timeout));
|
||||
})).then_wrapped([p = shared_from_this(), lc, &stats] (future<result<>> f) {
|
||||
return p->mutate_end(std::move(f), lc, stats, nullptr).then(utils::result_into_future<result<>>);
|
||||
})).then_wrapped([p = shared_from_this(), lc, &stats] (future<result<>> f) mutable {
|
||||
// Internal writes (hints, view updates) don't go through the CQL
|
||||
// transport layer, so mark latency here directly.
|
||||
stats.write.mark(lc.stop().latency());
|
||||
return p->mutate_end(std::move(f), stats, nullptr).then(utils::result_into_future<result<>>);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -6739,28 +6749,41 @@ storage_proxy::do_query(schema_ptr s,
|
||||
auto f = do_query_with_paxos(std::move(s), std::move(cmd), std::move(partition_ranges), cl, std::move(query_options), std::move(*cas_shard));
|
||||
return utils::then_ok_result<result<storage_proxy::coordinator_query_result>>(std::move(f));
|
||||
} else {
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
std::optional<utils::latency_counter> lc;
|
||||
if (!query_options.defer_coordinator_latency_mark) {
|
||||
lc.emplace();
|
||||
lc->start();
|
||||
}
|
||||
auto p = shared_from_this();
|
||||
|
||||
if (query::is_single_partition(partition_ranges[0])) { // do not support mixed partitions (yet?)
|
||||
try {
|
||||
return query_singular(cmd,
|
||||
auto f = query_singular(cmd,
|
||||
std::move(partition_ranges),
|
||||
cl,
|
||||
std::move(query_options)).finally([lc, p] () mutable {
|
||||
std::move(query_options));
|
||||
if (!lc) {
|
||||
return f;
|
||||
}
|
||||
return std::move(f).finally([lc = std::move(*lc), p] () mutable {
|
||||
p->get_stats().read.mark(lc.stop().latency());
|
||||
});
|
||||
} catch (const replica::no_such_column_family&) {
|
||||
get_stats().read.mark(lc.stop().latency());
|
||||
if (lc) {
|
||||
get_stats().read.mark(lc->stop().latency());
|
||||
}
|
||||
return make_empty();
|
||||
}
|
||||
}
|
||||
|
||||
return query_partition_key_range(cmd,
|
||||
auto f = query_partition_key_range(cmd,
|
||||
std::move(partition_ranges),
|
||||
cl,
|
||||
std::move(query_options)).finally([lc, p] () mutable {
|
||||
std::move(query_options));
|
||||
if (!lc) {
|
||||
return f;
|
||||
}
|
||||
return std::move(f).finally([lc = std::move(*lc), p] () mutable {
|
||||
p->get_stats().range.mark(lc.stop().latency());
|
||||
});
|
||||
}
|
||||
@@ -6929,15 +6952,21 @@ future<bool> storage_proxy::cas(schema_ptr schema, cas_shard cas_shard, cas_requ
|
||||
|
||||
unsigned contentions = 0;
|
||||
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
std::optional<utils::latency_counter> lc;
|
||||
if (!query_options.defer_coordinator_latency_mark) {
|
||||
lc.emplace();
|
||||
lc->start();
|
||||
}
|
||||
|
||||
bool condition_met;
|
||||
|
||||
try {
|
||||
auto update_stats = seastar::defer ([&] {
|
||||
get_stats().cas_foreground--;
|
||||
write ? get_stats().cas_write.mark(lc.stop().latency()) : get_stats().cas_read.mark(lc.stop().latency());
|
||||
if (lc) {
|
||||
auto latency = lc->stop().latency();
|
||||
(write ? get_stats().cas_write : get_stats().cas_read).mark(latency);
|
||||
}
|
||||
if (contentions > 0) {
|
||||
write ? get_stats().cas_write_contention.add(contentions) : get_stats().cas_read_contention.add(contentions);
|
||||
}
|
||||
@@ -6964,7 +6993,11 @@ future<bool> storage_proxy::cas(schema_ptr schema, cas_shard cas_shard, cas_requ
|
||||
++get_stats().cas_failed_read_round_optimization;
|
||||
|
||||
auto pr = partition_ranges; // cannot move original because it can be reused during retry
|
||||
auto cqr = co_await query(schema, cmd, std::move(pr), cl, query_options);
|
||||
// Always defer read latency marking for the internal read inside CAS;
|
||||
// CAS has its own latency histogram (cas_read/cas_write) marked in update_stats above.
|
||||
auto internal_query_options = query_options;
|
||||
internal_query_options.defer_coordinator_latency_mark = true;
|
||||
auto cqr = co_await query(schema, cmd, std::move(pr), cl, std::move(internal_query_options));
|
||||
qr = std::move(cqr.query_result);
|
||||
}
|
||||
|
||||
|
||||
@@ -130,6 +130,7 @@ public:
|
||||
replicas_per_token_range preferred_replicas;
|
||||
std::optional<db::read_repair_decision> read_repair_decision;
|
||||
node_local_only node_local_only;
|
||||
bool defer_coordinator_latency_mark = false;
|
||||
|
||||
storage_proxy_coordinator_query_options(storage_proxy_clock_type::time_point timeout,
|
||||
service_permit permit_,
|
||||
@@ -137,14 +138,16 @@ public:
|
||||
tracing::trace_state_ptr trace_state = nullptr,
|
||||
replicas_per_token_range preferred_replicas = { },
|
||||
std::optional<db::read_repair_decision> read_repair_decision = { },
|
||||
service::node_local_only node_local_only_ = service::node_local_only::no)
|
||||
service::node_local_only node_local_only_ = service::node_local_only::no,
|
||||
bool defer_coordinator_latency_mark_ = false)
|
||||
: _timeout(timeout)
|
||||
, permit(std::move(permit_))
|
||||
, cstate(client_state_)
|
||||
, trace_state(std::move(trace_state))
|
||||
, preferred_replicas(std::move(preferred_replicas))
|
||||
, read_repair_decision(read_repair_decision)
|
||||
, node_local_only(node_local_only_) {
|
||||
, node_local_only(node_local_only_)
|
||||
, defer_coordinator_latency_mark(defer_coordinator_latency_mark_) {
|
||||
}
|
||||
|
||||
storage_proxy_clock_type::time_point timeout(storage_proxy& sp) const {
|
||||
@@ -169,6 +172,7 @@ struct storage_proxy_coordinator_query_result {
|
||||
struct storage_proxy_coordinator_mutate_options {
|
||||
cdc::per_request_options cdc_options;
|
||||
node_local_only node_local_only = node_local_only::no;
|
||||
bool defer_coordinator_latency_mark = false;
|
||||
};
|
||||
|
||||
class cas_request;
|
||||
@@ -467,7 +471,7 @@ private:
|
||||
template<typename Range>
|
||||
future<result<unique_response_handler_vector>> mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, coordinator_mutate_options options);
|
||||
future<result<>> mutate_begin(unique_response_handler_vector ids, db::consistency_level cl, tracing::trace_state_ptr trace_state, std::optional<clock_type::time_point> timeout_opt = { });
|
||||
future<result<>> mutate_end(future<result<>> mutate_result, utils::latency_counter, write_stats& stats, tracing::trace_state_ptr trace_state);
|
||||
future<result<>> mutate_end(future<result<>> mutate_result, write_stats& stats, tracing::trace_state_ptr trace_state);
|
||||
future<result<>> schedule_repair(locator::effective_replication_map_ptr ermp, mutations_per_partition_key_map diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, service_permit permit);
|
||||
bool need_throttle_writes() const;
|
||||
void unthrottle();
|
||||
|
||||
@@ -80,6 +80,16 @@ using coordinator_result = exceptions::coordinator_result<T>;
|
||||
|
||||
namespace cql_transport {
|
||||
|
||||
// process_fn_return_type constructors/destructor - defined here because
|
||||
// response is an incomplete type in the header.
|
||||
cql_server::process_fn_return_type::process_fn_return_type(result_with_foreign_response_ptr r, std::optional<service::deferred_latency_mark> lm)
|
||||
: result(std::move(r)), latency_mark(std::move(lm)) {}
|
||||
cql_server::process_fn_return_type::process_fn_return_type(result_with_bounce r, std::optional<service::deferred_latency_mark> lm)
|
||||
: result(std::move(r)), latency_mark(std::move(lm)) {}
|
||||
cql_server::process_fn_return_type::process_fn_return_type(process_fn_return_type&&) noexcept = default;
|
||||
cql_server::process_fn_return_type& cql_server::process_fn_return_type::operator=(process_fn_return_type&&) noexcept = default;
|
||||
cql_server::process_fn_return_type::~process_fn_return_type() = default;
|
||||
|
||||
static logging::logger clogger("cql_server");
|
||||
|
||||
/**
|
||||
@@ -503,7 +513,9 @@ future<forward_cql_execute_response> cql_server::handle_forward_execute(
|
||||
std::move(req.cached_fn_calls),
|
||||
handling_node_bounce::yes));
|
||||
|
||||
if (auto* bounce_msg = std::get_if<cql_server::result_with_bounce>(&result)) {
|
||||
if (auto* bounce_msg = std::get_if<cql_server::result_with_bounce>(&result.result)) {
|
||||
// The request needs to be redirected — don't mark latency since
|
||||
// no real work was done on this node.
|
||||
auto host = (*bounce_msg)->target_host();
|
||||
auto shard = (*bounce_msg)->target_shard();
|
||||
co_return forward_cql_execute_response{
|
||||
@@ -513,7 +525,13 @@ future<forward_cql_execute_response> cql_server::handle_forward_execute(
|
||||
};
|
||||
}
|
||||
|
||||
auto& final_result = std::get<cql_server::result_with_foreign_response_ptr>(result);
|
||||
// Mark latency on the target shard since forwarded requests don't
|
||||
// go through the originating shard's transport flush path.
|
||||
if (result.latency_mark && result.latency_mark->histogram) {
|
||||
result.latency_mark->histogram->mark(result.latency_mark->lc.stop().latency());
|
||||
}
|
||||
|
||||
auto& final_result = std::get<cql_server::result_with_foreign_response_ptr>(result.result);
|
||||
|
||||
if (!final_result) {
|
||||
co_return co_await coroutine::try_future(final_result.assume_error().as_exception_future<forward_cql_execute_response>());
|
||||
@@ -934,7 +952,7 @@ std::unique_ptr<cql_server::response> cql_server::handle_exception(int16_t strea
|
||||
return make_error(stream, exceptions::exception_code::SERVER_ERROR, "unknown error", trace_state);
|
||||
}
|
||||
}
|
||||
future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
||||
future<cql_server::response_with_latency>
|
||||
cql_server::connection::process_request_one(fragmented_temporary_buffer::istream fbuf, uint8_t op, uint16_t stream, service::client_state& client_state, tracing_request_type tracing_request, service_permit permit) {
|
||||
using auth_state = service::client_state::auth_state;
|
||||
|
||||
@@ -1004,18 +1022,26 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
||||
_version, get_dialect());
|
||||
default: return make_exception_future<process_fn_return_type>(exceptions::protocol_exception(format("Unknown opcode {:d}", int(cqlop))));
|
||||
}
|
||||
}).then_wrapped([this, cqlop, &cql_stats, stream, &client_state, linearization_buffer = std::move(linearization_buffer), trace_state] (future<process_fn_return_type> f) {
|
||||
}).then_wrapped([this, cqlop, &cql_stats, stream, &client_state, linearization_buffer = std::move(linearization_buffer), trace_state] (future<process_fn_return_type> f) -> future<response_with_latency> {
|
||||
auto stop_trace = defer([&] {
|
||||
tracing::stop_foreground(trace_state);
|
||||
});
|
||||
return seastar::futurize_invoke([&] () {
|
||||
// Extract latency mark so it survives error paths.
|
||||
// On f.failed(), the mark was destroyed with the coroutine frame,
|
||||
// so latency_mark stays nullopt — acceptable since the statement
|
||||
// layer may not have run far enough to set a histogram.
|
||||
std::optional<service::deferred_latency_mark> latency_mark;
|
||||
std::exception_ptr eptr;
|
||||
try {
|
||||
if (f.failed()) {
|
||||
return make_exception_future<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(f).get_exception());
|
||||
std::rethrow_exception(std::move(f).get_exception());
|
||||
}
|
||||
|
||||
result_with_foreign_response_ptr res = std::get<result_with_foreign_response_ptr>(f.get());
|
||||
auto ret = f.get();
|
||||
latency_mark = std::move(ret.latency_mark);
|
||||
result_with_foreign_response_ptr res = std::get<result_with_foreign_response_ptr>(std::move(ret.result));
|
||||
if (!res) {
|
||||
return std::move(res).assume_error().as_exception_future<foreign_ptr<std::unique_ptr<cql_server::response>>>();
|
||||
std::move(res).assume_error().throw_me();
|
||||
}
|
||||
|
||||
auto response = std::move(res).assume_value();
|
||||
@@ -1035,7 +1061,7 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
||||
case auth_state::AUTHENTICATION:
|
||||
// Support both SASL auth from protocol v2 and the older style Credentials auth from v1
|
||||
if (cqlop != cql_binary_opcode::AUTH_RESPONSE && cqlop != cql_binary_opcode::CREDENTIALS) {
|
||||
return make_exception_future<foreign_ptr<std::unique_ptr<cql_server::response>>>(exceptions::protocol_exception(format("Unexpected message {:d}, expecting AUTH_RESPONSE or CREDENTIALS", int(cqlop))));
|
||||
throw exceptions::protocol_exception(format("Unexpected message {:d}, expecting AUTH_RESPONSE or CREDENTIALS", int(cqlop)));
|
||||
}
|
||||
if (res_op == cql_binary_opcode::READY || res_op == cql_binary_opcode::AUTH_SUCCESS) {
|
||||
client_state.set_auth_state(auth_state::READY);
|
||||
@@ -1048,15 +1074,17 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
||||
|
||||
tracing::set_response_size(trace_state, response->size());
|
||||
cql_stats.response_size.add(response->size());
|
||||
return make_ready_future<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(response));
|
||||
}).handle_exception([this, stream, &client_state, trace_state] (std::exception_ptr eptr) {
|
||||
auto response = _server.handle_exception(stream, eptr, trace_state, _version, client_state);
|
||||
if (auto timeout = _server.timeout_for_sleep(eptr)) {
|
||||
// Return read timeout exception, as we wait here until the timeout passes
|
||||
return _server.sleep_until_timeout_passes(*timeout, std::move(response));
|
||||
}
|
||||
return utils::result_into_future<result_with_foreign_response_ptr>(std::move(response));
|
||||
});
|
||||
co_return response_with_latency{std::move(response), std::move(latency_mark)};
|
||||
} catch (...) {
|
||||
eptr = std::current_exception();
|
||||
}
|
||||
// Handle exception outside catch block so co_await is allowed.
|
||||
auto response = _server.handle_exception(stream, eptr, trace_state, _version, client_state);
|
||||
if (auto timeout = _server.timeout_for_sleep(eptr)) {
|
||||
auto resp = co_await _server.sleep_until_timeout_passes(*timeout, make_foreign(std::move(response)));
|
||||
co_return response_with_latency{std::move(resp), std::move(latency_mark)};
|
||||
}
|
||||
co_return response_with_latency{make_foreign(std::move(response)), std::move(latency_mark)};
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1263,14 +1291,15 @@ future<> cql_server::connection::process_request() {
|
||||
op == uint8_t (cql_binary_opcode::EXECUTE) ||
|
||||
op == uint8_t(cql_binary_opcode::BATCH));
|
||||
|
||||
future<foreign_ptr<std::unique_ptr<cql_server::response>>> request_process_future = should_paralelize ?
|
||||
future<response_with_latency> request_process_future = should_paralelize ?
|
||||
_process_request_stage(this, istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit) :
|
||||
process_request_one(istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit);
|
||||
|
||||
future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> response_f) mutable {
|
||||
future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream] (future<response_with_latency> response_f) mutable {
|
||||
try {
|
||||
auto& sg_stats = _server.get_cql_sg_stats();
|
||||
size_t pending_response_size = 0;
|
||||
std::optional<service::deferred_latency_mark> latency_mark;
|
||||
if (response_f.failed()) {
|
||||
const auto message = format("request processing failed, error [{}]", response_f.get_exception());
|
||||
clogger.error("{}: {}", _client_state.get_remote_address(), message);
|
||||
@@ -1278,7 +1307,9 @@ future<> cql_server::connection::process_request() {
|
||||
message,
|
||||
tracing::trace_state_ptr()));
|
||||
} else {
|
||||
auto response = response_f.get();
|
||||
auto result = response_f.get();
|
||||
latency_mark = std::move(result.latency_mark);
|
||||
auto response = std::move(result.response);
|
||||
// Account for response body size exceeding the initial estimate.
|
||||
auto resp_size = response->size();
|
||||
auto permit_size = mem_permit.count();
|
||||
@@ -1291,8 +1322,13 @@ future<> cql_server::connection::process_request() {
|
||||
sg_stats._pending_response_memory += pending_response_size;
|
||||
write_response(std::move(response), _compression);
|
||||
}
|
||||
_ready_to_respond = _ready_to_respond.finally([leave = std::move(leave), permit = std::move(mem_permit), &sg_stats, pending_response_size] {
|
||||
_ready_to_respond = _ready_to_respond.finally([leave = std::move(leave), permit = std::move(mem_permit), &sg_stats, pending_response_size, latency_mark = std::move(latency_mark)] () mutable {
|
||||
sg_stats._pending_response_memory -= pending_response_size;
|
||||
// Stop the latency counter and mark the histogram now that
|
||||
// the response has been flushed to the OS socket.
|
||||
if (latency_mark && latency_mark->histogram) {
|
||||
latency_mark->histogram->mark(latency_mark->lc.stop().latency());
|
||||
}
|
||||
});
|
||||
} catch (...) {
|
||||
clogger.error("{}: request processing failed: {}",
|
||||
@@ -1531,6 +1567,7 @@ process_query_internal(service::client_state& client_state, sharded<cql3::query_
|
||||
}
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
|
||||
auto& query_state = q_state->query_state;
|
||||
query_state.start_latency();
|
||||
auto o = in.read_options(version, qp.local().get_cql_config());
|
||||
if (!o) {
|
||||
return make_exception_future<cql_server::process_fn_return_type>(std::move(o).assume_error());
|
||||
@@ -1552,14 +1589,15 @@ process_query_internal(service::client_state& client_state, sharded<cql3::query_
|
||||
}
|
||||
|
||||
return qp.local().execute_direct_without_checking_exception_message(query.assume_value(), query_state, dialect, options).then([q_state = std::move(q_state), stream, skip_metadata, version] (auto msg) {
|
||||
auto latency_mark = q_state->query_state.take_deferred_latency();
|
||||
if (msg->as_bounce()) {
|
||||
return cql_server::process_fn_return_type(make_foreign(static_pointer_cast<messages::result_message::bounce>(msg)));
|
||||
return cql_server::process_fn_return_type(make_foreign(static_pointer_cast<messages::result_message::bounce>(msg)), std::move(latency_mark));
|
||||
} else if (msg->is_exception()) {
|
||||
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
|
||||
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()), std::move(latency_mark));
|
||||
} else {
|
||||
tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
|
||||
|
||||
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, cql_metadata_id_wrapper{}, skip_metadata)));
|
||||
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, cql_metadata_id_wrapper{}, skip_metadata)), std::move(latency_mark));
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -1629,6 +1667,7 @@ process_execute_internal(service::client_state& client_state, sharded<cql3::quer
|
||||
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
|
||||
auto& query_state = q_state->query_state;
|
||||
query_state.start_latency();
|
||||
auto o = in.read_options(version, qp.local().get_cql_config());
|
||||
if (!o) {
|
||||
return make_exception_future<cql_server::process_fn_return_type>(std::move(o).assume_error());
|
||||
@@ -1670,13 +1709,14 @@ process_execute_internal(service::client_state& client_state, sharded<cql3::quer
|
||||
tracing::trace(trace_state, "Processing a statement");
|
||||
return qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, std::move(prepared), std::move(cache_key), needs_authorization)
|
||||
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version, metadata_id = std::move(metadata_id)] (auto msg) mutable {
|
||||
auto latency_mark = q_state->query_state.take_deferred_latency();
|
||||
if (msg->as_bounce()) {
|
||||
return cql_server::process_fn_return_type(make_foreign(static_pointer_cast<messages::result_message::bounce>(msg)));
|
||||
return cql_server::process_fn_return_type(make_foreign(static_pointer_cast<messages::result_message::bounce>(msg)), std::move(latency_mark));
|
||||
} else if (msg->is_exception()) {
|
||||
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
|
||||
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()), std::move(latency_mark));
|
||||
} else {
|
||||
tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
|
||||
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, std::move(metadata_id), skip_metadata)));
|
||||
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, std::move(metadata_id), skip_metadata)), std::move(latency_mark));
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -1787,6 +1827,7 @@ process_batch_internal(service::client_state& client_state, sharded<cql3::query_
|
||||
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
|
||||
auto& query_state = q_state->query_state;
|
||||
query_state.start_latency();
|
||||
// #563. CQL v2 encodes query_options in v1 format for batch requests.
|
||||
auto o = in.read_options(version, qp.local().get_cql_config());
|
||||
if (!o) {
|
||||
@@ -1810,14 +1851,15 @@ process_batch_internal(service::client_state& client_state, sharded<cql3::query_
|
||||
batch->set_audit_info(batch->audit_info());
|
||||
return qp.local().execute_batch_without_checking_exception_message(batch, query_state, options, std::move(pending_authorization_entries))
|
||||
.then([stream, batch, q_state = std::move(q_state), trace_state = query_state.get_trace_state(), version] (auto msg) {
|
||||
auto latency_mark = q_state->query_state.take_deferred_latency();
|
||||
if (msg->as_bounce()) {
|
||||
return cql_server::process_fn_return_type(make_foreign(static_pointer_cast<messages::result_message::bounce>(msg)));
|
||||
return cql_server::process_fn_return_type(make_foreign(static_pointer_cast<messages::result_message::bounce>(msg)), std::move(latency_mark));
|
||||
} else if (msg->is_exception()) {
|
||||
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
|
||||
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()), std::move(latency_mark));
|
||||
} else {
|
||||
tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
|
||||
|
||||
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, trace_state, version, cql_metadata_id_wrapper{})));
|
||||
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, trace_state, version, cql_metadata_id_wrapper{})), std::move(latency_mark));
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -1859,10 +1901,10 @@ cql_server::process(uint16_t stream, request_reader in, service::client_state& c
|
||||
|
||||
bool init_trace = (bool)!bounced; // If the request was bounced, we already started the trace in the handler
|
||||
auto msg = co_await coroutine::try_future(process_fn(client_state, _query_processor, in, stream,
|
||||
version, permit, trace_state, init_trace, {}, dialect));
|
||||
while (auto* bounce_msg = std::get_if<cql_server::result_with_bounce>(&msg)) {
|
||||
version, permit, trace_state, init_trace, std::move(cached_fn_calls), dialect));
|
||||
while (auto* bounce_msg = std::get_if<cql_server::result_with_bounce>(&msg.result)) {
|
||||
auto shard = (*bounce_msg)->target_shard();
|
||||
auto&& cached_vals = (*bounce_msg)->take_cached_pk_function_calls();
|
||||
auto cached_vals = (*bounce_msg)->take_cached_pk_function_calls();
|
||||
auto target_host = (*bounce_msg)->target_host();
|
||||
auto my_host_id = _query_processor.local().proxy().get_token_metadata_ptr()->get_topology().my_host_id();
|
||||
if (target_host == my_host_id) {
|
||||
@@ -1870,13 +1912,25 @@ cql_server::process(uint16_t stream, request_reader in, service::client_state& c
|
||||
auto sg = _config.bounce_request_smp_service_group;
|
||||
auto gcs = client_state.move_to_other_shard();
|
||||
auto gt = tracing::global_trace_state_ptr(trace_state);
|
||||
msg = co_await container().invoke_on(shard, sg, [&, stream, dialect, version] (cql_server& server) -> future<process_fn_return_type> {
|
||||
msg = co_await container().invoke_on(shard, sg, [&, stream, dialect, version, cached_vals = std::move(cached_vals)] (cql_server& server) mutable -> future<process_fn_return_type> {
|
||||
bytes_ostream linearization_buffer;
|
||||
request_reader in(is, linearization_buffer);
|
||||
auto local_client_state = gcs.get(&server._abort_source);
|
||||
auto local_trace_state = gt.get();
|
||||
co_return co_await process_fn(local_client_state, server._query_processor, in, stream, version,
|
||||
/* FIXME */empty_service_permit(), std::move(local_trace_state), false, cached_vals, dialect);
|
||||
auto ret = co_await process_fn(local_client_state, server._query_processor, in, stream, version,
|
||||
/* FIXME */empty_service_permit(), std::move(local_trace_state), false, std::move(cached_vals), dialect);
|
||||
// Mark latency on the target shard before returning.
|
||||
// The histogram pointer belongs to this shard's stats and
|
||||
// must not be dereferenced from another shard.
|
||||
// Only mark when the result is not a bounce — a bounce means
|
||||
// the request will be retried elsewhere and no real work was done.
|
||||
if (!std::get_if<cql_server::result_with_bounce>(&ret.result)) {
|
||||
if (ret.latency_mark && ret.latency_mark->histogram) {
|
||||
ret.latency_mark->histogram->mark(ret.latency_mark->lc.stop().latency());
|
||||
}
|
||||
}
|
||||
ret.latency_mark.reset();
|
||||
co_return ret;
|
||||
});
|
||||
} else {
|
||||
// Node bounce
|
||||
@@ -1897,7 +1951,7 @@ cql_server::process(uint16_t stream, request_reader in, service::client_state& c
|
||||
.dialect = dialect,
|
||||
.client_state = client_state,
|
||||
.trace_info = tracing::make_trace_info(trace_state),
|
||||
.cached_fn_calls = std::move(cached_fn_calls),
|
||||
.cached_fn_calls = std::move(cached_vals),
|
||||
};
|
||||
|
||||
auto response = co_await forward_cql(
|
||||
|
||||
@@ -250,7 +250,17 @@ public:
|
||||
using response = cql_transport::response;
|
||||
using result_with_foreign_response_ptr = exceptions::coordinator_result<foreign_ptr<std::unique_ptr<cql_server::response>>>;
|
||||
using result_with_bounce = foreign_ptr<seastar::shared_ptr<messages::result_message::bounce>>;
|
||||
using process_fn_return_type = std::variant<result_with_foreign_response_ptr, result_with_bounce>;
|
||||
using process_fn_result = std::variant<result_with_foreign_response_ptr, result_with_bounce>;
|
||||
struct process_fn_return_type {
|
||||
process_fn_result result;
|
||||
std::optional<service::deferred_latency_mark> latency_mark;
|
||||
|
||||
process_fn_return_type(result_with_foreign_response_ptr r, std::optional<service::deferred_latency_mark> lm = std::nullopt);
|
||||
process_fn_return_type(result_with_bounce r, std::optional<service::deferred_latency_mark> lm = std::nullopt);
|
||||
process_fn_return_type(process_fn_return_type&&) noexcept;
|
||||
process_fn_return_type& operator=(process_fn_return_type&&) noexcept;
|
||||
~process_fn_return_type();
|
||||
};
|
||||
|
||||
service::endpoint_lifecycle_subscriber* get_lifecycle_listener() const noexcept;
|
||||
service::migration_listener* get_migration_listener() const noexcept;
|
||||
@@ -287,6 +297,11 @@ private:
|
||||
std::optional<seastar::lowres_clock::time_point> timeout_for_sleep(std::exception_ptr eptr) const;
|
||||
future<foreign_ptr<std::unique_ptr<cql_server::response>>> sleep_until_timeout_passes(const seastar::lowres_clock::time_point& timeout, foreign_ptr<std::unique_ptr<cql_server::response>>&& resp);
|
||||
|
||||
struct response_with_latency {
|
||||
foreign_ptr<std::unique_ptr<cql_server::response>> response;
|
||||
std::optional<service::deferred_latency_mark> latency_mark;
|
||||
};
|
||||
|
||||
class connection : public generic_server::connection {
|
||||
cql_server& _server;
|
||||
socket_address _server_addr;
|
||||
@@ -308,7 +323,7 @@ private:
|
||||
};
|
||||
private:
|
||||
using execution_stage_type = inheriting_concrete_execution_stage<
|
||||
future<foreign_ptr<std::unique_ptr<cql_server::response>>>,
|
||||
future<response_with_latency>,
|
||||
cql_server::connection*,
|
||||
fragmented_temporary_buffer::istream,
|
||||
uint8_t,
|
||||
@@ -330,7 +345,7 @@ private:
|
||||
private:
|
||||
friend class process_request_executor;
|
||||
|
||||
future<foreign_ptr<std::unique_ptr<cql_server::response>>> process_request_one(fragmented_temporary_buffer::istream buf, uint8_t op, uint16_t stream, service::client_state& client_state, tracing_request_type tracing_request, service_permit permit);
|
||||
future<response_with_latency> process_request_one(fragmented_temporary_buffer::istream buf, uint8_t op, uint16_t stream, service::client_state& client_state, tracing_request_type tracing_request, service_permit permit);
|
||||
unsigned frame_size() const;
|
||||
unsigned pick_request_cpu();
|
||||
utils::result_with_exception<cql_binary_frame_v3, exceptions::protocol_exception, class cql_frame_error> parse_frame(temporary_buffer<char> buf) const;
|
||||
|
||||
Reference in New Issue
Block a user