storage_service: replicate_to_all_cores: update base and view tables atomically
Currently, the loop updating all tables (including views) with the new effective_replication_map may yield, and therefore expose a state where the base and view tables effective_replication_map and topology are out of sync (as seen in scylladb/scylladb#17786) To prevent that, loop over all base tables and for each table update the base table and all views atomically, without yielding, and so allow yielding only between base tables. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -3050,7 +3050,9 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
|
||||
std::vector<std::unordered_map<sstring, locator::vnode_effective_replication_map_ptr>> pending_effective_replication_maps;
|
||||
pending_effective_replication_maps.resize(smp::count);
|
||||
std::vector<std::unordered_map<table_id, locator::effective_replication_map_ptr>> pending_table_erms;
|
||||
std::vector<std::unordered_map<table_id, locator::effective_replication_map_ptr>> pending_view_erms;
|
||||
pending_table_erms.resize(smp::count);
|
||||
pending_view_erms.resize(smp::count);
|
||||
|
||||
std::unordered_set<session_id> open_sessions;
|
||||
|
||||
@@ -3119,7 +3121,11 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
|
||||
} else {
|
||||
erm = pending_effective_replication_maps[this_shard_id()][table->schema()->ks_name()];
|
||||
}
|
||||
pending_table_erms[this_shard_id()].emplace(id, std::move(erm));
|
||||
if (table->schema()->is_view()) {
|
||||
pending_view_erms[this_shard_id()].emplace(id, std::move(erm));
|
||||
} else {
|
||||
pending_table_erms[this_shard_id()].emplace(id, std::move(erm));
|
||||
}
|
||||
});
|
||||
});
|
||||
} catch (...) {
|
||||
@@ -3133,6 +3139,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
|
||||
auto tmptr = std::move(pending_token_metadata_ptr[this_shard_id()]);
|
||||
auto erms = std::move(pending_effective_replication_maps[this_shard_id()]);
|
||||
auto table_erms = std::move(pending_table_erms[this_shard_id()]);
|
||||
auto view_erms = std::move(pending_view_erms[this_shard_id()]);
|
||||
|
||||
co_await utils::clear_gently(erms);
|
||||
co_await utils::clear_gently(tmptr);
|
||||
@@ -3158,9 +3165,21 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
|
||||
}
|
||||
|
||||
auto& table_erms = pending_table_erms[this_shard_id()];
|
||||
auto& view_erms = pending_view_erms[this_shard_id()];
|
||||
for (auto it = table_erms.begin(); it != table_erms.end(); ) {
|
||||
// Update base/views effective_replication_maps atomically.
|
||||
auto& cf = db.find_column_family(it->first);
|
||||
cf.update_effective_replication_map(std::move(it->second));
|
||||
for (const auto& view_ptr : cf.views()) {
|
||||
const auto& view_id = view_ptr->id();
|
||||
auto view_it = view_erms.find(view_id);
|
||||
if (view_it == view_erms.end()) {
|
||||
throw std::runtime_error(format("Could not find pending effective_replication_map for view {}.{} id={}", view_ptr->ks_name(), view_ptr->cf_name(), view_id));
|
||||
}
|
||||
auto& view = db.find_column_family(view_id);
|
||||
view.update_effective_replication_map(std::move(view_it->second));
|
||||
view_erms.erase(view_it);
|
||||
}
|
||||
co_await utils::get_local_injector().inject("delay_after_erm_update", [&cf, &ss] (auto& handler) -> future<> {
|
||||
auto& ss_ = ss;
|
||||
const auto ks_name = handler.get("ks_name");
|
||||
@@ -3179,6 +3198,10 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
|
||||
it = table_erms.erase(it);
|
||||
}
|
||||
|
||||
if (!view_erms.empty()) {
|
||||
throw std::runtime_error(fmt::format("Found orphaned pending effective_replication_maps for the following views: {}", std::views::keys(view_erms)));
|
||||
}
|
||||
|
||||
auto& session_mgr = get_topology_session_manager();
|
||||
session_mgr.initiate_close_of_sessions_except(open_sessions);
|
||||
for (auto id : open_sessions) {
|
||||
|
||||
Reference in New Issue
Block a user