table: coroutinize do_push_view_replica_updates

Makes the code cleaner, but more importantly it will make it easier
to futurize calculate_affected_clustering_ranges in the near future.
This commit is contained in:
Piotr Sarna
2021-06-14 11:50:35 +02:00
parent 3592d9b36e
commit e3fa0246a1
2 changed files with 21 additions and 30 deletions

View File

@@ -1002,7 +1002,7 @@ public:
}
private:
future<row_locker::lock_holder> do_push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, mutation_source&& source,
future<row_locker::lock_holder> do_push_view_replica_updates(schema_ptr s, mutation m, db::timeout_clock::time_point timeout, mutation_source source,
tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem, const io_priority_class& io_priority, query::partition_slice::option_set custom_opts) const;
std::vector<view_ptr> affected_views(const schema_ptr& base, const mutation& update, gc_clock::time_point now) const;
future<> generate_and_propagate_view_updates(const schema_ptr& base,

View File

@@ -2187,7 +2187,7 @@ future<row_locker::lock_holder> table::push_view_replica_updates(const schema_pt
return push_view_replica_updates(s, std::move(m), timeout, std::move(tr_state), sem);
}
future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, mutation_source&& source,
future<row_locker::lock_holder> table::do_push_view_replica_updates(schema_ptr s, mutation m, db::timeout_clock::time_point timeout, mutation_source source,
tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem, const io_priority_class& io_priority, query::partition_slice::option_set custom_opts) const {
if (!_config.view_update_concurrency_semaphore->current()) {
// We don't have resources to generate view updates for this write. If we reached this point, we failed to
@@ -2196,9 +2196,9 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
// drop the base write, which could create inconsistencies between base replicas. So we dolefully continue,
// and note the fact we dropped a view update.
++_config.cf_stats->dropped_view_updates;
return make_ready_future<row_locker::lock_holder>();
co_return row_locker::lock_holder();
}
auto& base = schema();
schema_ptr base = schema();
m.upgrade(base);
gc_clock::time_point now = gc_clock::now();
utils::get_local_injector().inject("table_push_view_replica_updates_stale_time_point", [&now] {
@@ -2206,16 +2206,15 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
});
auto views = db::view::with_base_info_snapshot(affected_views(base, m, now));
if (views.empty()) {
return make_ready_future<row_locker::lock_holder>();
co_return row_locker::lock_holder();
}
auto cr_ranges = db::view::calculate_affected_clustering_ranges(*base, m.decorated_key(), m.partition(), views, now);
if (cr_ranges.empty()) {
tracing::trace(tr_state, "View updates do not require read-before-write");
return generate_and_propagate_view_updates(base, sem.make_permit(s.get(), "push-view-updates-1"), std::move(views), std::move(m), { }, std::move(tr_state), now).then([] {
// In this case we are not doing a read-before-write, just a
// write, so no lock is needed.
return make_ready_future<row_locker::lock_holder>();
});
co_await generate_and_propagate_view_updates(base, sem.make_permit(s.get(), "push-view-updates-1"), std::move(views), std::move(m), { }, std::move(tr_state), now);
// In this case we are not doing a read-before-write, just a
// write, so no lock is needed.
co_return row_locker::lock_holder();
}
// We read the whole set of regular columns in case the update now causes a base row to pass
// a view's filters, and a view happens to include columns that have no value in this update.
@@ -2234,26 +2233,18 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
// We'll return this lock to the caller, which will release it after
// writing the base-table update.
future<row_locker::lock_holder> lockf = local_base_lock(base, m.decorated_key(), slice.default_row_ranges(), timeout);
return utils::get_local_injector().inject("table_push_view_replica_updates_timeout", timeout).then([lockf = std::move(lockf), timeout] () mutable {
return std::move(lockf);
}).then([m = std::move(m), slice = std::move(slice), views = std::move(views), base, this, timeout, now, source = std::move(source), &sem, tr_state = std::move(tr_state), &io_priority] (row_locker::lock_holder lock) mutable {
return do_with(
dht::partition_range::make_singular(m.decorated_key()),
std::move(slice),
std::move(m),
[base, views = std::move(views), lock = std::move(lock), this, timeout, now, source = std::move(source), &sem, &io_priority, tr_state = std::move(tr_state)] (auto& pk, auto& slice, auto& m) mutable {
auto permit = sem.make_permit(base.get(), "push-view-updates-2");
auto reader = source.make_reader(base, permit, pk, slice, io_priority, tr_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
return this->generate_and_propagate_view_updates(base, std::move(permit), std::move(views), std::move(m), std::move(reader), tr_state, now)
.then([base, tr_state = std::move(tr_state), lock = std::move(lock)] () mutable {
tracing::trace(tr_state, "View updates for {}.{} were generated and propagated", base->ks_name(), base->cf_name());
// return the local partition/row lock we have taken so it
// remains locked until the caller is done modifying this
// partition/row and destroys the lock object.
return std::move(lock);
});
});
});
co_await utils::get_local_injector().inject("table_push_view_replica_updates_timeout", timeout);
auto lock = co_await std::move(lockf);
auto pk = dht::partition_range::make_singular(m.decorated_key());
auto permit = sem.make_permit(base.get(), "push-view-updates-2");
auto reader = source.make_reader(base, permit, pk, slice, io_priority, tr_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
co_await this->generate_and_propagate_view_updates(base, std::move(permit), std::move(views), std::move(m), std::move(reader), tr_state, now);
tracing::trace(tr_state, "View updates for {}.{} were generated and propagated", base->ks_name(), base->cf_name());
// return the local partition/row lock we have taken so it
// remains locked until the caller is done modifying this
// partition/row and destroys the lock object.
co_return std::move(lock);
}
future<row_locker::lock_holder> table::push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,