database: mutation_query(): convert into coroutine

To facilitate further patching (and reading).
This commit is contained in:
Botond Dénes
2021-06-10 16:58:19 +03:00
parent d2f5393a43
commit f9d302bf49

View File

@@ -1453,32 +1453,28 @@ future<std::tuple<reconcilable_result, cache_temperature>>
database::query_mutations(schema_ptr s, const query::read_command& cmd, const dht::partition_range& range,
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) {
const auto short_read_allwoed = query::short_read(cmd.slice.options.contains<query::partition_slice::option::allow_short_read>());
return get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allwoed).then(
[&, s = std::move(s), trace_state = std::move(trace_state), timeout] (query::result_memory_accounter accounter) {
auto accounter = co_await get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allwoed);
column_family& cf = find_column_family(cmd.cf_id);
auto& semaphore = get_reader_concurrency_semaphore();
auto class_config = query::query_class_config{.semaphore = semaphore, .max_memory_for_unlimited_query = *cmd.max_result_size};
query::querier_cache_context cache_ctx(_querier_cache, cmd.query_uuid, cmd.is_first_page);
return _mutation_query_stage(&cf,
std::move(s),
seastar::cref(cmd),
class_config,
seastar::cref(range),
std::move(trace_state),
std::move(accounter),
timeout,
std::move(cache_ctx)).then_wrapped([this, s = _stats, &semaphore, hit_rate = cf.get_global_cache_hit_rate(), op = cf.read_in_progress()] (auto f) {
if (f.failed()) {
++semaphore.get_stats().total_failed_reads;
return make_exception_future<std::tuple<reconcilable_result, cache_temperature>>(f.get_exception());
} else {
++semaphore.get_stats().total_successful_reads;
auto result = f.get0();
s->short_mutation_queries += bool(result.is_short_read());
return make_ready_future<std::tuple<reconcilable_result, cache_temperature>>(std::tuple(std::move(result), hit_rate));
}
});
});
reconcilable_result result;
try {
auto op = cf.read_in_progress();
result = co_await _mutation_query_stage(&cf, std::move(s), seastar::cref(cmd), class_config, seastar::cref(range),
std::move(trace_state), std::move(accounter), timeout, std::move(cache_ctx));
} catch (...) {
++semaphore.get_stats().total_failed_reads;
throw;
}
auto hit_rate = cf.get_global_cache_hit_rate();
++semaphore.get_stats().total_successful_reads;
_stats->short_mutation_queries += bool(result.is_short_read());
co_return std::tuple(std::move(result), hit_rate);
}
std::unordered_set<sstring> database::get_initial_tokens() {