db: view: react to synchronous updates tag

Code that waited for all remote view updates was already there. This
commit modifies the conditions of this wait to take into account the
"synchronous mode" (enabled when db::SYNCHRONOUS_VIEW_UPDATES_TAG_KEY is
set).
This commit is contained in:
Michał Sala
2022-06-10 13:53:03 +02:00
committed by Piotr Sarna
parent 128806f022
commit d573ab0b58

View File

@@ -37,6 +37,8 @@
#include "db/system_keyspace_view_types.hh"
#include "db/system_keyspace.hh"
#include "db/system_distributed_keyspace.hh"
#include "db/tags/utils.hh"
#include "db/tags/extension.hh"
#include "gms/inet_address.hh"
#include "keys.hh"
#include "locator/network_topology_strategy.hh"
@@ -1235,6 +1237,15 @@ static future<> apply_to_remote_endpoints(gms::inet_address target, inet_address
allow_hints);
}
static bool should_update_synchronously(const schema& s) {
auto tag_opt = db::find_tag(s, db::SYNCHRONOUS_VIEW_UPDATES_TAG_KEY);
if (!tag_opt.has_value()) {
return false;
}
return *tag_opt == "true";
}
// Take the view mutations generated by generate_view_updates(), which pertain
// to a modification of a single base partition, and apply them to the
// appropriate paired replicas. This is done asynchronously - we do not wait
@@ -1258,6 +1269,15 @@ future<> mutate_MV(
auto remote_endpoints = service::get_local_storage_proxy().get_token_metadata_ptr()->pending_endpoints_for(view_token, keyspace_name);
auto sem_units = pending_view_updates.split(mut.fm.representation().size());
const bool update_synchronously = should_update_synchronously(*mut.s);
if (update_synchronously) {
tracing::trace(tr_state, "Forcing {}.{} view update to be synchronous (synchronous_updates property was set)",
mut.s->ks_name(), mut.s->cf_name()
);
}
// If a view is marked with the synchronous_updates property, we should wait for all.
const bool apply_update_synchronously = wait_for_all || update_synchronously;
// First, find the local endpoint and ensure that if it exists,
// it will be the target endpoint. That way, all endpoints in the
// remote_endpoints list are guaranteed to be remote.
@@ -1332,7 +1352,7 @@ future<> mutate_MV(
schema_ptr s = mut.s;
future<> view_update = apply_to_remote_endpoints(*target_endpoint, std::move(remote_endpoints), std::move(mut), base_token, view_token, allow_hints, tr_state).then_wrapped(
[s = std::move(s), &stats, &cf_stats, tr_state, base_token, view_token, target_endpoint, updates_pushed_remote,
units = sem_units.split(sem_units.count()), wait_for_all] (future<>&& f) mutable {
units = sem_units.split(sem_units.count()), apply_update_synchronously] (future<>&& f) mutable {
if (f.failed()) {
stats.view_updates_failed_remote += updates_pushed_remote;
cf_stats.total_view_updates_failed_remote += updates_pushed_remote;
@@ -1341,13 +1361,13 @@ future<> mutate_MV(
*target_endpoint, updates_pushed_remote);
vlogger.error("Error applying view update to {} (view: {}.{}, base token: {}, view token: {}): {}",
*target_endpoint, s->ks_name(), s->cf_name(), base_token, view_token, ep);
return wait_for_all ? make_exception_future<>(std::move(ep)) : make_ready_future<>();
return apply_update_synchronously ? make_exception_future<>(std::move(ep)) : make_ready_future<>();
}
tracing::trace(tr_state, "Successfully applied view update for {} and {} remote endpoints",
*target_endpoint, updates_pushed_remote);
return make_ready_future<>();
});
if (wait_for_all) {
if (apply_update_synchronously) {
remote_view_update = std::move(view_update);
} else {
// The update is sent to background in order to preserve availability,