From d573ab0b58dd7b599cd2ebba871da0f00a692de6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Sala?= Date: Fri, 10 Jun 2022 13:53:03 +0200 Subject: [PATCH] db: view: react to synchronous updates tag Code that waited for all remote view updates was already there. This commit modifies the conditions of this wait to take into account the "synchronous mode" (enabled when db::SYNCHRONOUS_VIEW_UPDATES_TAG_KEY is set). --- db/view/view.cc | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index d11b2f4f55..8bb25be10b 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -37,6 +37,8 @@ #include "db/system_keyspace_view_types.hh" #include "db/system_keyspace.hh" #include "db/system_distributed_keyspace.hh" +#include "db/tags/utils.hh" +#include "db/tags/extension.hh" #include "gms/inet_address.hh" #include "keys.hh" #include "locator/network_topology_strategy.hh" @@ -1235,6 +1237,15 @@ static future<> apply_to_remote_endpoints(gms::inet_address target, inet_address allow_hints); } +static bool should_update_synchronously(const schema& s) { + auto tag_opt = db::find_tag(s, db::SYNCHRONOUS_VIEW_UPDATES_TAG_KEY); + if (!tag_opt.has_value()) { + return false; + } + + return *tag_opt == "true"; +} + // Take the view mutations generated by generate_view_updates(), which pertain // to a modification of a single base partition, and apply them to the // appropriate paired replicas. This is done asynchronously - we do not wait @@ -1258,6 +1269,15 @@ future<> mutate_MV( auto remote_endpoints = service::get_local_storage_proxy().get_token_metadata_ptr()->pending_endpoints_for(view_token, keyspace_name); auto sem_units = pending_view_updates.split(mut.fm.representation().size()); + const bool update_synchronously = should_update_synchronously(*mut.s); + if (update_synchronously) { + tracing::trace(tr_state, "Forcing {}.{} view update to be synchronous (synchronous_updates property was set)", + mut.s->ks_name(), mut.s->cf_name() + ); + } + // If a view is marked with the synchronous_updates property, we should wait for all. + const bool apply_update_synchronously = wait_for_all || update_synchronously; + // First, find the local endpoint and ensure that if it exists, // it will be the target endpoint. That way, all endpoints in the // remote_endpoints list are guaranteed to be remote. @@ -1332,7 +1352,7 @@ future<> mutate_MV( schema_ptr s = mut.s; future<> view_update = apply_to_remote_endpoints(*target_endpoint, std::move(remote_endpoints), std::move(mut), base_token, view_token, allow_hints, tr_state).then_wrapped( [s = std::move(s), &stats, &cf_stats, tr_state, base_token, view_token, target_endpoint, updates_pushed_remote, - units = sem_units.split(sem_units.count()), wait_for_all] (future<>&& f) mutable { + units = sem_units.split(sem_units.count()), apply_update_synchronously] (future<>&& f) mutable { if (f.failed()) { stats.view_updates_failed_remote += updates_pushed_remote; cf_stats.total_view_updates_failed_remote += updates_pushed_remote; @@ -1341,13 +1361,13 @@ future<> mutate_MV( *target_endpoint, updates_pushed_remote); vlogger.error("Error applying view update to {} (view: {}.{}, base token: {}, view token: {}): {}", *target_endpoint, s->ks_name(), s->cf_name(), base_token, view_token, ep); - return wait_for_all ? make_exception_future<>(std::move(ep)) : make_ready_future<>(); + return apply_update_synchronously ? make_exception_future<>(std::move(ep)) : make_ready_future<>(); } tracing::trace(tr_state, "Successfully applied view update for {} and {} remote endpoints", *target_endpoint, updates_pushed_remote); return make_ready_future<>(); }); - if (wait_for_all) { + if (apply_update_synchronously) { remote_view_update = std::move(view_update); } else { // The update is sent to background in order to preserve availability,