storage_service: replicate_to_all_cores: update base and view tables atomically

Currently, the loop updating all tables (including views) with the
new effective_replication_map may yield, and therefore expose
a state where the base and view tables effective_replication_map
and topology are out of sync (as seen in scylladb/scylladb#17786)

To prevent that, loop over all base tables and for each table
update the base table and all views atomically, without yielding,
and so allow yielding only between base tables.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2024-12-04 16:47:07 +02:00
parent 10c4cf930c
commit 4bfa3060d0

View File

@@ -3050,7 +3050,9 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
std::vector<std::unordered_map<sstring, locator::vnode_effective_replication_map_ptr>> pending_effective_replication_maps;
pending_effective_replication_maps.resize(smp::count);
std::vector<std::unordered_map<table_id, locator::effective_replication_map_ptr>> pending_table_erms;
std::vector<std::unordered_map<table_id, locator::effective_replication_map_ptr>> pending_view_erms;
pending_table_erms.resize(smp::count);
pending_view_erms.resize(smp::count);
std::unordered_set<session_id> open_sessions;
@@ -3119,7 +3121,11 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
} else {
erm = pending_effective_replication_maps[this_shard_id()][table->schema()->ks_name()];
}
pending_table_erms[this_shard_id()].emplace(id, std::move(erm));
if (table->schema()->is_view()) {
pending_view_erms[this_shard_id()].emplace(id, std::move(erm));
} else {
pending_table_erms[this_shard_id()].emplace(id, std::move(erm));
}
});
});
} catch (...) {
@@ -3133,6 +3139,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
auto tmptr = std::move(pending_token_metadata_ptr[this_shard_id()]);
auto erms = std::move(pending_effective_replication_maps[this_shard_id()]);
auto table_erms = std::move(pending_table_erms[this_shard_id()]);
auto view_erms = std::move(pending_view_erms[this_shard_id()]);
co_await utils::clear_gently(erms);
co_await utils::clear_gently(tmptr);
@@ -3158,9 +3165,21 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
}
auto& table_erms = pending_table_erms[this_shard_id()];
auto& view_erms = pending_view_erms[this_shard_id()];
for (auto it = table_erms.begin(); it != table_erms.end(); ) {
// Update base/views effective_replication_maps atomically.
auto& cf = db.find_column_family(it->first);
cf.update_effective_replication_map(std::move(it->second));
for (const auto& view_ptr : cf.views()) {
const auto& view_id = view_ptr->id();
auto view_it = view_erms.find(view_id);
if (view_it == view_erms.end()) {
throw std::runtime_error(format("Could not find pending effective_replication_map for view {}.{} id={}", view_ptr->ks_name(), view_ptr->cf_name(), view_id));
}
auto& view = db.find_column_family(view_id);
view.update_effective_replication_map(std::move(view_it->second));
view_erms.erase(view_it);
}
co_await utils::get_local_injector().inject("delay_after_erm_update", [&cf, &ss] (auto& handler) -> future<> {
auto& ss_ = ss;
const auto ks_name = handler.get("ks_name");
@@ -3179,6 +3198,10 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
it = table_erms.erase(it);
}
if (!view_erms.empty()) {
throw std::runtime_error(fmt::format("Found orphaned pending effective_replication_maps for the following views: {}", std::views::keys(view_erms)));
}
auto& session_mgr = get_topology_session_manager();
session_mgr.initiate_close_of_sessions_except(open_sessions);
for (auto id : open_sessions) {