From 106fd39c6ca04c9123fc50a7bbf343ed2dc6c81e Mon Sep 17 00:00:00 2001 From: Marcin Maliszkiewicz Date: Tue, 5 Aug 2025 10:09:06 +0200 Subject: [PATCH] storage_service: split replicate_to_all_cores to steps In later commits schema merge code will use those prepare and commit steps. Rest of the code will continue using replicate_to_all_cores. --- service/storage_service.cc | 154 ++++++++++++++++++++----------------- service/storage_service.hh | 19 +++++ 2 files changed, 101 insertions(+), 72 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 35063ad7f1..2f2502c7f6 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -19,6 +19,7 @@ #include "gc_clock.hh" #include "raft/raft.hh" #include +#include #include #include "service/qos/raft_service_level_distributed_data_accessor.hh" #include "service/qos/service_level_controller.hh" @@ -3207,28 +3208,16 @@ future<> storage_service::join_cluster(sharded& proxy, std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay(), start_hm, new_generation); } -future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmptr) noexcept { +future storage_service::prepare_token_metadata_change(mutable_token_metadata_ptr tmptr) { SCYLLA_ASSERT(this_shard_id() == 0); - - slogger.debug("Replicating token_metadata to all cores"); std::exception_ptr ex; - - std::vector pending_token_metadata_ptr; - pending_token_metadata_ptr.resize(smp::count); - std::vector> pending_effective_replication_maps; - pending_effective_replication_maps.resize(smp::count); - std::vector> pending_table_erms; - std::vector> pending_view_erms; - pending_table_erms.resize(smp::count); - pending_view_erms.resize(smp::count); - - std::unordered_set open_sessions; + token_metadata_change change; // Collect open sessions { auto session = _topology_state_machine._topology.session; if (session) { - open_sessions.insert(session); + change.open_sessions.insert(session); } for (auto&& [table, tables] : tmptr->tablets().all_table_groups()) { @@ -3236,7 +3225,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt for (auto&& [tid, trinfo]: tmap.transitions()) { if (trinfo.session_id) { auto id = session_id(trinfo.session_id); - open_sessions.insert(id); + change.open_sessions.insert(id); } } } @@ -3244,11 +3233,11 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt try { auto base_shard = this_shard_id(); - pending_token_metadata_ptr[base_shard] = tmptr; + change.pending_token_metadata_ptr[base_shard] = tmptr; auto& sharded_token_metadata = _shared_token_metadata.container(); // clone a local copy of updated token_metadata on all other shards co_await smp::invoke_on_others(base_shard, [&, tmptr] () -> future<> { - pending_token_metadata_ptr[this_shard_id()] = sharded_token_metadata.local().make_token_metadata_ptr(co_await tmptr->clone_async()); + change.pending_token_metadata_ptr[this_shard_id()] = sharded_token_metadata.local().make_token_metadata_ptr(co_await tmptr->clone_async()); }); // Precalculate new effective_replication_map for all keyspaces @@ -3265,7 +3254,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt continue; } auto erm = co_await get_erm_factory().create_static_effective_replication_map(rs, tmptr); - pending_effective_replication_maps[base_shard].emplace(ks_name, std::move(erm)); + change.pending_effective_replication_maps[base_shard].emplace(ks_name, std::move(erm)); } co_await container().invoke_on_others([&] (storage_service& ss) -> future<> { auto& db = ss._db.local(); @@ -3274,27 +3263,27 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt if (rs->is_per_table()) { continue; } - auto tmptr = pending_token_metadata_ptr[this_shard_id()]; + auto tmptr = change.pending_token_metadata_ptr[this_shard_id()]; auto erm = co_await ss.get_erm_factory().create_static_effective_replication_map(rs, tmptr); - pending_effective_replication_maps[this_shard_id()].emplace(ks_name, std::move(erm)); + change.pending_effective_replication_maps[this_shard_id()].emplace(ks_name, std::move(erm)); } }); // Prepare per-table erms. co_await container().invoke_on_all([&] (storage_service& ss) -> future<> { auto& db = ss._db.local(); - auto tmptr = pending_token_metadata_ptr[this_shard_id()]; + auto tmptr = change.pending_token_metadata_ptr[this_shard_id()]; co_await db.get_tables_metadata().for_each_table_gently([&] (table_id id, lw_shared_ptr table) { auto rs = db.find_keyspace(table->schema()->ks_name()).get_replication_strategy_ptr(); locator::effective_replication_map_ptr erm; if (auto pt_rs = rs->maybe_as_per_table()) { erm = pt_rs->make_replication_map(id, tmptr); } else { - erm = pending_effective_replication_maps[this_shard_id()][table->schema()->ks_name()]; + erm = change.pending_effective_replication_maps[this_shard_id()][table->schema()->ks_name()]; } if (table->schema()->is_view()) { - pending_view_erms[this_shard_id()].emplace(id, std::move(erm)); + change.pending_view_erms[this_shard_id()].emplace(id, std::move(erm)); } else { - pending_table_erms[this_shard_id()].emplace(id, std::move(erm)); + change.pending_table_erms[this_shard_id()].emplace(id, std::move(erm)); } return make_ready_future(); }); @@ -3307,10 +3296,10 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt if (ex) { try { co_await smp::invoke_on_all([&] () -> future<> { - auto tmptr = std::move(pending_token_metadata_ptr[this_shard_id()]); - auto erms = std::move(pending_effective_replication_maps[this_shard_id()]); - auto table_erms = std::move(pending_table_erms[this_shard_id()]); - auto view_erms = std::move(pending_view_erms[this_shard_id()]); + auto tmptr = std::move(change.pending_token_metadata_ptr[this_shard_id()]); + auto erms = std::move(change.pending_effective_replication_maps[this_shard_id()]); + auto table_erms = std::move(change.pending_table_erms[this_shard_id()]); + auto view_erms = std::move(change.pending_view_erms[this_shard_id()]); co_await utils::clear_gently(erms); co_await utils::clear_gently(tmptr); @@ -3324,58 +3313,58 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt std::rethrow_exception(std::move(ex)); } - // Apply changes on all shards + co_return change; +} + +void storage_service::commit_token_metadata_change(token_metadata_change& change) noexcept { + slogger.debug("Replicating token_metadata"); + + // Apply changes on a single shard try { - co_await container().invoke_on_all([&] (storage_service& ss) -> future<> { - ss._shared_token_metadata.set(std::move(pending_token_metadata_ptr[this_shard_id()])); - auto& db = ss._db.local(); + _shared_token_metadata.set(std::move(change.pending_token_metadata_ptr[this_shard_id()])); + auto& db =_db.local(); - auto& erms = pending_effective_replication_maps[this_shard_id()]; - for (auto it = erms.begin(); it != erms.end(); ) { - auto& ks = db.find_keyspace(it->first); - ks.update_static_effective_replication_map(std::move(it->second)); - it = erms.erase(it); - } + auto& erms = change.pending_effective_replication_maps[this_shard_id()]; + for (auto it = erms.begin(); it != erms.end(); ) { + auto& ks = db.find_keyspace(it->first); + ks.update_static_effective_replication_map(std::move(it->second)); + it = erms.erase(it); + } - auto& table_erms = pending_table_erms[this_shard_id()]; - auto& view_erms = pending_view_erms[this_shard_id()]; - for (auto it = table_erms.begin(); it != table_erms.end(); ) { - co_await coroutine::maybe_yield(); - // Update base/views effective_replication_maps atomically. - auto& cf = db.find_column_family(it->first); - cf.update_effective_replication_map(std::move(it->second)); - for (const auto& view_ptr : cf.views()) { - const auto& view_id = view_ptr->id(); - auto view_it = view_erms.find(view_id); - if (view_it == view_erms.end()) { - throw std::runtime_error(format("Could not find pending effective_replication_map for view {}.{} id={}", view_ptr->ks_name(), view_ptr->cf_name(), view_id)); - } - auto& view = db.find_column_family(view_id); - view.update_effective_replication_map(std::move(view_it->second)); - if (view.uses_tablets()) { - register_tablet_split_candidate(view_it->first); - } - view_erms.erase(view_it); + auto& table_erms = change.pending_table_erms[this_shard_id()]; + auto& view_erms = change.pending_view_erms[this_shard_id()]; + for (auto it = table_erms.begin(); it != table_erms.end(); ) { + // Update base/views effective_replication_maps atomically. + auto& cf = db.find_column_family(it->first); + cf.update_effective_replication_map(std::move(it->second)); + for (const auto& view_ptr : cf.views()) { + const auto& view_id = view_ptr->id(); + auto view_it = view_erms.find(view_id); + if (view_it == view_erms.end()) { + throw std::runtime_error(format("Could not find pending effective_replication_map for view {}.{} id={}", view_ptr->ks_name(), view_ptr->cf_name(), view_id)); } - if (cf.uses_tablets()) { - register_tablet_split_candidate(it->first); + auto& view = db.find_column_family(view_id); + view.update_effective_replication_map(std::move(view_it->second)); + if (view.uses_tablets()) { + register_tablet_split_candidate(view_it->first); } - it = table_erms.erase(it); + view_erms.erase(view_it); } - - if (!view_erms.empty()) { - throw std::runtime_error(fmt::format("Found orphaned pending effective_replication_maps for the following views: {}", std::views::keys(view_erms))); + if (cf.uses_tablets()) { + register_tablet_split_candidate(it->first); } + it = table_erms.erase(it); + } - auto& session_mgr = get_topology_session_manager(); - session_mgr.initiate_close_of_sessions_except(open_sessions); - for (auto id : open_sessions) { - session_mgr.create_session(id); - } + if (!view_erms.empty()) { + throw std::runtime_error(fmt::format("Found orphaned pending effective_replication_maps for the following views: {}", std::views::keys(view_erms))); + } - auto& gc_state = db.get_compaction_manager().get_shared_tombstone_gc_state(); - co_await gc_state.flush_pending_repair_time_update(db); - }); + auto& session_mgr = get_topology_session_manager(); + session_mgr.initiate_close_of_sessions_except(change.open_sessions); + for (auto id : change.open_sessions) { + session_mgr.create_session(id); + } } catch (...) { // applying the changes on all shards should never fail // it will end up in an inconsistent state that we can't recover from. @@ -3384,6 +3373,27 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt } } + future<> token_metadata_change::destroy() { + return smp::invoke_on_all([this] () -> future<> { + pending_token_metadata_ptr[this_shard_id()] = nullptr; + co_await utils::clear_gently(pending_effective_replication_maps[this_shard_id()]); + co_await utils::clear_gently(pending_table_erms[this_shard_id()]); + co_await utils::clear_gently(pending_view_erms[this_shard_id()]); + }); +} + +future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmptr) noexcept { + SCYLLA_ASSERT(this_shard_id() == 0); + slogger.debug("Replicating token_metadata to all cores"); + auto change = co_await prepare_token_metadata_change(tmptr); + co_await container().invoke_on_all([&change] (storage_service& ss) { + ss.commit_token_metadata_change(change); + }); + co_await change.destroy(); + co_await _db.local().get_compaction_manager().get_shared_tombstone_gc_state(). + flush_pending_repair_time_update(_db.local()); +} + future<> storage_service::stop() { // if there is a background "isolate" shutdown // in progress, we need to sync with it. Mostly diff --git a/service/storage_service.hh b/service/storage_service.hh index 08820dae04..561e22d55f 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -141,6 +141,16 @@ class node_ops_meta_data; using start_hint_manager = seastar::bool_class; using loosen_constraints = seastar::bool_class; +struct token_metadata_change { + std::vector pending_token_metadata_ptr{smp::count}; + std::vector> pending_effective_replication_maps{smp::count}; + std::vector> pending_table_erms{smp::count}; + std::vector> pending_view_erms{smp::count}; + std::unordered_set open_sessions; + + future<> destroy(); +}; + /** * This abstraction contains the token/identifier of this node * on the identifier space. This token gets gossiped around. @@ -289,6 +299,15 @@ private: // Note: must be called on shard 0. future<> mutate_token_metadata(std::function (mutable_token_metadata_ptr)> func, acquire_merge_lock aml = acquire_merge_lock::yes) noexcept; + // Prepares token metadata change without making it visible. Combined with commit function + // and appropiate lock it does exactly the same as mutate_token_metadata. + // Note: prepare_token_metadata_change must be called on shard 0. + future prepare_token_metadata_change(mutable_token_metadata_ptr tmptr); + + // Commits prepared token metadata changes. Must be called under token_metadata_lock + // and on all shards. + void commit_token_metadata_change(token_metadata_change& change) noexcept; + // Update pending ranges locally and then replicate to all cores. // Should be serialized under token_metadata_lock. // Must be called on shard 0.