diff --git a/service/storage_service.cc b/service/storage_service.cc index da8f2d16de..ddb2b38b30 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -3050,7 +3050,9 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt std::vector> pending_effective_replication_maps; pending_effective_replication_maps.resize(smp::count); std::vector> pending_table_erms; + std::vector> pending_view_erms; pending_table_erms.resize(smp::count); + pending_view_erms.resize(smp::count); std::unordered_set open_sessions; @@ -3119,7 +3121,11 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt } else { erm = pending_effective_replication_maps[this_shard_id()][table->schema()->ks_name()]; } - pending_table_erms[this_shard_id()].emplace(id, std::move(erm)); + if (table->schema()->is_view()) { + pending_view_erms[this_shard_id()].emplace(id, std::move(erm)); + } else { + pending_table_erms[this_shard_id()].emplace(id, std::move(erm)); + } }); }); } catch (...) { @@ -3133,6 +3139,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt auto tmptr = std::move(pending_token_metadata_ptr[this_shard_id()]); auto erms = std::move(pending_effective_replication_maps[this_shard_id()]); auto table_erms = std::move(pending_table_erms[this_shard_id()]); + auto view_erms = std::move(pending_view_erms[this_shard_id()]); co_await utils::clear_gently(erms); co_await utils::clear_gently(tmptr); @@ -3158,9 +3165,21 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt } auto& table_erms = pending_table_erms[this_shard_id()]; + auto& view_erms = pending_view_erms[this_shard_id()]; for (auto it = table_erms.begin(); it != table_erms.end(); ) { + // Update base/views effective_replication_maps atomically. auto& cf = db.find_column_family(it->first); cf.update_effective_replication_map(std::move(it->second)); + for (const auto& view_ptr : cf.views()) { + const auto& view_id = view_ptr->id(); + auto view_it = view_erms.find(view_id); + if (view_it == view_erms.end()) { + throw std::runtime_error(format("Could not find pending effective_replication_map for view {}.{} id={}", view_ptr->ks_name(), view_ptr->cf_name(), view_id)); + } + auto& view = db.find_column_family(view_id); + view.update_effective_replication_map(std::move(view_it->second)); + view_erms.erase(view_it); + } co_await utils::get_local_injector().inject("delay_after_erm_update", [&cf, &ss] (auto& handler) -> future<> { auto& ss_ = ss; const auto ks_name = handler.get("ks_name"); @@ -3179,6 +3198,10 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt it = table_erms.erase(it); } + if (!view_erms.empty()) { + throw std::runtime_error(fmt::format("Found orphaned pending effective_replication_maps for the following views: {}", std::views::keys(view_erms))); + } + auto& session_mgr = get_topology_session_manager(); session_mgr.initiate_close_of_sessions_except(open_sessions); for (auto id : open_sessions) {