From f9d302bf498f76feb3c2aed19eb9c467322f0e20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 10 Jun 2021 16:58:19 +0300 Subject: [PATCH] database: mutation_query(): convert into coroutine To facilitate further patching (and reading). --- database.cc | 40 ++++++++++++++++++---------------------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/database.cc b/database.cc index 80eaae203a..b6d7746fb3 100644 --- a/database.cc +++ b/database.cc @@ -1453,32 +1453,28 @@ future> database::query_mutations(schema_ptr s, const query::read_command& cmd, const dht::partition_range& range, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) { const auto short_read_allwoed = query::short_read(cmd.slice.options.contains()); - return get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allwoed).then( - [&, s = std::move(s), trace_state = std::move(trace_state), timeout] (query::result_memory_accounter accounter) { + auto accounter = co_await get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allwoed); column_family& cf = find_column_family(cmd.cf_id); auto& semaphore = get_reader_concurrency_semaphore(); auto class_config = query::query_class_config{.semaphore = semaphore, .max_memory_for_unlimited_query = *cmd.max_result_size}; query::querier_cache_context cache_ctx(_querier_cache, cmd.query_uuid, cmd.is_first_page); - return _mutation_query_stage(&cf, - std::move(s), - seastar::cref(cmd), - class_config, - seastar::cref(range), - std::move(trace_state), - std::move(accounter), - timeout, - std::move(cache_ctx)).then_wrapped([this, s = _stats, &semaphore, hit_rate = cf.get_global_cache_hit_rate(), op = cf.read_in_progress()] (auto f) { - if (f.failed()) { - ++semaphore.get_stats().total_failed_reads; - return make_exception_future>(f.get_exception()); - } else { - ++semaphore.get_stats().total_successful_reads; - auto result = f.get0(); - s->short_mutation_queries += bool(result.is_short_read()); - return make_ready_future>(std::tuple(std::move(result), hit_rate)); - } - }); - }); + + reconcilable_result result; + + try { + auto op = cf.read_in_progress(); + + result = co_await _mutation_query_stage(&cf, std::move(s), seastar::cref(cmd), class_config, seastar::cref(range), + std::move(trace_state), std::move(accounter), timeout, std::move(cache_ctx)); + } catch (...) { + ++semaphore.get_stats().total_failed_reads; + throw; + } + + auto hit_rate = cf.get_global_cache_hit_rate(); + ++semaphore.get_stats().total_successful_reads; + _stats->short_mutation_queries += bool(result.is_short_read()); + co_return std::tuple(std::move(result), hit_rate); } std::unordered_set database::get_initial_tokens() {