diff --git a/db/view/view.cc b/db/view/view.cc index d11b2f4f55..8bb25be10b 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -37,6 +37,8 @@ #include "db/system_keyspace_view_types.hh" #include "db/system_keyspace.hh" #include "db/system_distributed_keyspace.hh" +#include "db/tags/utils.hh" +#include "db/tags/extension.hh" #include "gms/inet_address.hh" #include "keys.hh" #include "locator/network_topology_strategy.hh" @@ -1235,6 +1237,15 @@ static future<> apply_to_remote_endpoints(gms::inet_address target, inet_address allow_hints); } +static bool should_update_synchronously(const schema& s) { + auto tag_opt = db::find_tag(s, db::SYNCHRONOUS_VIEW_UPDATES_TAG_KEY); + if (!tag_opt.has_value()) { + return false; + } + + return *tag_opt == "true"; +} + // Take the view mutations generated by generate_view_updates(), which pertain // to a modification of a single base partition, and apply them to the // appropriate paired replicas. This is done asynchronously - we do not wait @@ -1258,6 +1269,15 @@ future<> mutate_MV( auto remote_endpoints = service::get_local_storage_proxy().get_token_metadata_ptr()->pending_endpoints_for(view_token, keyspace_name); auto sem_units = pending_view_updates.split(mut.fm.representation().size()); + const bool update_synchronously = should_update_synchronously(*mut.s); + if (update_synchronously) { + tracing::trace(tr_state, "Forcing {}.{} view update to be synchronous (synchronous_updates property was set)", + mut.s->ks_name(), mut.s->cf_name() + ); + } + // If a view is marked with the synchronous_updates property, we should wait for all. + const bool apply_update_synchronously = wait_for_all || update_synchronously; + // First, find the local endpoint and ensure that if it exists, // it will be the target endpoint. That way, all endpoints in the // remote_endpoints list are guaranteed to be remote. @@ -1332,7 +1352,7 @@ future<> mutate_MV( schema_ptr s = mut.s; future<> view_update = apply_to_remote_endpoints(*target_endpoint, std::move(remote_endpoints), std::move(mut), base_token, view_token, allow_hints, tr_state).then_wrapped( [s = std::move(s), &stats, &cf_stats, tr_state, base_token, view_token, target_endpoint, updates_pushed_remote, - units = sem_units.split(sem_units.count()), wait_for_all] (future<>&& f) mutable { + units = sem_units.split(sem_units.count()), apply_update_synchronously] (future<>&& f) mutable { if (f.failed()) { stats.view_updates_failed_remote += updates_pushed_remote; cf_stats.total_view_updates_failed_remote += updates_pushed_remote; @@ -1341,13 +1361,13 @@ future<> mutate_MV( *target_endpoint, updates_pushed_remote); vlogger.error("Error applying view update to {} (view: {}.{}, base token: {}, view token: {}): {}", *target_endpoint, s->ks_name(), s->cf_name(), base_token, view_token, ep); - return wait_for_all ? make_exception_future<>(std::move(ep)) : make_ready_future<>(); + return apply_update_synchronously ? make_exception_future<>(std::move(ep)) : make_ready_future<>(); } tracing::trace(tr_state, "Successfully applied view update for {} and {} remote endpoints", *target_endpoint, updates_pushed_remote); return make_ready_future<>(); }); - if (wait_for_all) { + if (apply_update_synchronously) { remote_view_update = std::move(view_update); } else { // The update is sent to background in order to preserve availability,