From e3fa0246a110d3aaa6e022183a821c873e2a1fca Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 14 Jun 2021 11:50:35 +0200 Subject: [PATCH] table: coroutinize do_push_view_replica_updates Makes the code cleaner, but more importantly it will make it easier to futurize calculate_affected_clustering_ranges in the near future. --- database.hh | 2 +- table.cc | 49 ++++++++++++++++++++----------------------------- 2 files changed, 21 insertions(+), 30 deletions(-) diff --git a/database.hh b/database.hh index b45663b0c0..0830f3b477 100644 --- a/database.hh +++ b/database.hh @@ -1002,7 +1002,7 @@ public: } private: - future do_push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, mutation_source&& source, + future do_push_view_replica_updates(schema_ptr s, mutation m, db::timeout_clock::time_point timeout, mutation_source source, tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem, const io_priority_class& io_priority, query::partition_slice::option_set custom_opts) const; std::vector affected_views(const schema_ptr& base, const mutation& update, gc_clock::time_point now) const; future<> generate_and_propagate_view_updates(const schema_ptr& base, diff --git a/table.cc b/table.cc index 5547ac08d3..e1f17b934f 100644 --- a/table.cc +++ b/table.cc @@ -2187,7 +2187,7 @@ future table::push_view_replica_updates(const schema_pt return push_view_replica_updates(s, std::move(m), timeout, std::move(tr_state), sem); } -future table::do_push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, mutation_source&& source, +future table::do_push_view_replica_updates(schema_ptr s, mutation m, db::timeout_clock::time_point timeout, mutation_source source, tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem, const io_priority_class& io_priority, query::partition_slice::option_set custom_opts) const { if (!_config.view_update_concurrency_semaphore->current()) { // We don't have resources to generate view updates for this write. If we reached this point, we failed to @@ -2196,9 +2196,9 @@ future table::do_push_view_replica_updates(const schema // drop the base write, which could create inconsistencies between base replicas. So we dolefully continue, // and note the fact we dropped a view update. ++_config.cf_stats->dropped_view_updates; - return make_ready_future(); + co_return row_locker::lock_holder(); } - auto& base = schema(); + schema_ptr base = schema(); m.upgrade(base); gc_clock::time_point now = gc_clock::now(); utils::get_local_injector().inject("table_push_view_replica_updates_stale_time_point", [&now] { @@ -2206,16 +2206,15 @@ future table::do_push_view_replica_updates(const schema }); auto views = db::view::with_base_info_snapshot(affected_views(base, m, now)); if (views.empty()) { - return make_ready_future(); + co_return row_locker::lock_holder(); } auto cr_ranges = db::view::calculate_affected_clustering_ranges(*base, m.decorated_key(), m.partition(), views, now); if (cr_ranges.empty()) { tracing::trace(tr_state, "View updates do not require read-before-write"); - return generate_and_propagate_view_updates(base, sem.make_permit(s.get(), "push-view-updates-1"), std::move(views), std::move(m), { }, std::move(tr_state), now).then([] { - // In this case we are not doing a read-before-write, just a - // write, so no lock is needed. - return make_ready_future(); - }); + co_await generate_and_propagate_view_updates(base, sem.make_permit(s.get(), "push-view-updates-1"), std::move(views), std::move(m), { }, std::move(tr_state), now); + // In this case we are not doing a read-before-write, just a + // write, so no lock is needed. + co_return row_locker::lock_holder(); } // We read the whole set of regular columns in case the update now causes a base row to pass // a view's filters, and a view happens to include columns that have no value in this update. @@ -2234,26 +2233,18 @@ future table::do_push_view_replica_updates(const schema // We'll return this lock to the caller, which will release it after // writing the base-table update. future lockf = local_base_lock(base, m.decorated_key(), slice.default_row_ranges(), timeout); - return utils::get_local_injector().inject("table_push_view_replica_updates_timeout", timeout).then([lockf = std::move(lockf), timeout] () mutable { - return std::move(lockf); - }).then([m = std::move(m), slice = std::move(slice), views = std::move(views), base, this, timeout, now, source = std::move(source), &sem, tr_state = std::move(tr_state), &io_priority] (row_locker::lock_holder lock) mutable { - return do_with( - dht::partition_range::make_singular(m.decorated_key()), - std::move(slice), - std::move(m), - [base, views = std::move(views), lock = std::move(lock), this, timeout, now, source = std::move(source), &sem, &io_priority, tr_state = std::move(tr_state)] (auto& pk, auto& slice, auto& m) mutable { - auto permit = sem.make_permit(base.get(), "push-view-updates-2"); - auto reader = source.make_reader(base, permit, pk, slice, io_priority, tr_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); - return this->generate_and_propagate_view_updates(base, std::move(permit), std::move(views), std::move(m), std::move(reader), tr_state, now) - .then([base, tr_state = std::move(tr_state), lock = std::move(lock)] () mutable { - tracing::trace(tr_state, "View updates for {}.{} were generated and propagated", base->ks_name(), base->cf_name()); - // return the local partition/row lock we have taken so it - // remains locked until the caller is done modifying this - // partition/row and destroys the lock object. - return std::move(lock); - }); - }); - }); + co_await utils::get_local_injector().inject("table_push_view_replica_updates_timeout", timeout); + auto lock = co_await std::move(lockf); + auto pk = dht::partition_range::make_singular(m.decorated_key()); + auto permit = sem.make_permit(base.get(), "push-view-updates-2"); + auto reader = source.make_reader(base, permit, pk, slice, io_priority, tr_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); + co_await this->generate_and_propagate_view_updates(base, std::move(permit), std::move(views), std::move(m), std::move(reader), tr_state, now); + tracing::trace(tr_state, "View updates for {}.{} were generated and propagated", base->ks_name(), base->cf_name()); + // return the local partition/row lock we have taken so it + // remains locked until the caller is done modifying this + // partition/row and destroys the lock object. + co_return std::move(lock); + } future table::push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,