storage_service: split replicate_to_all_cores to steps

In later commits schema merge code will use those prepare
and commit steps. Rest of the code will continue using
replicate_to_all_cores.
This commit is contained in:
Marcin Maliszkiewicz
2025-08-05 10:09:06 +02:00
parent b0f11b6d91
commit 106fd39c6c
2 changed files with 101 additions and 72 deletions

View File

@@ -19,6 +19,7 @@
#include "gc_clock.hh"
#include "raft/raft.hh"
#include <ranges>
#include <seastar/core/shard_id.hh>
#include <seastar/core/sleep.hh>
#include "service/qos/raft_service_level_distributed_data_accessor.hh"
#include "service/qos/service_level_controller.hh"
@@ -3207,28 +3208,16 @@ future<> storage_service::join_cluster(sharded<service::storage_proxy>& proxy,
std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay(), start_hm, new_generation);
}
future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmptr) noexcept {
future<token_metadata_change> storage_service::prepare_token_metadata_change(mutable_token_metadata_ptr tmptr) {
SCYLLA_ASSERT(this_shard_id() == 0);
slogger.debug("Replicating token_metadata to all cores");
std::exception_ptr ex;
std::vector<mutable_token_metadata_ptr> pending_token_metadata_ptr;
pending_token_metadata_ptr.resize(smp::count);
std::vector<std::unordered_map<sstring, locator::static_effective_replication_map_ptr>> pending_effective_replication_maps;
pending_effective_replication_maps.resize(smp::count);
std::vector<std::unordered_map<table_id, locator::effective_replication_map_ptr>> pending_table_erms;
std::vector<std::unordered_map<table_id, locator::effective_replication_map_ptr>> pending_view_erms;
pending_table_erms.resize(smp::count);
pending_view_erms.resize(smp::count);
std::unordered_set<session_id> open_sessions;
token_metadata_change change;
// Collect open sessions
{
auto session = _topology_state_machine._topology.session;
if (session) {
open_sessions.insert(session);
change.open_sessions.insert(session);
}
for (auto&& [table, tables] : tmptr->tablets().all_table_groups()) {
@@ -3236,7 +3225,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
for (auto&& [tid, trinfo]: tmap.transitions()) {
if (trinfo.session_id) {
auto id = session_id(trinfo.session_id);
open_sessions.insert(id);
change.open_sessions.insert(id);
}
}
}
@@ -3244,11 +3233,11 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
try {
auto base_shard = this_shard_id();
pending_token_metadata_ptr[base_shard] = tmptr;
change.pending_token_metadata_ptr[base_shard] = tmptr;
auto& sharded_token_metadata = _shared_token_metadata.container();
// clone a local copy of updated token_metadata on all other shards
co_await smp::invoke_on_others(base_shard, [&, tmptr] () -> future<> {
pending_token_metadata_ptr[this_shard_id()] = sharded_token_metadata.local().make_token_metadata_ptr(co_await tmptr->clone_async());
change.pending_token_metadata_ptr[this_shard_id()] = sharded_token_metadata.local().make_token_metadata_ptr(co_await tmptr->clone_async());
});
// Precalculate new effective_replication_map for all keyspaces
@@ -3265,7 +3254,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
continue;
}
auto erm = co_await get_erm_factory().create_static_effective_replication_map(rs, tmptr);
pending_effective_replication_maps[base_shard].emplace(ks_name, std::move(erm));
change.pending_effective_replication_maps[base_shard].emplace(ks_name, std::move(erm));
}
co_await container().invoke_on_others([&] (storage_service& ss) -> future<> {
auto& db = ss._db.local();
@@ -3274,27 +3263,27 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
if (rs->is_per_table()) {
continue;
}
auto tmptr = pending_token_metadata_ptr[this_shard_id()];
auto tmptr = change.pending_token_metadata_ptr[this_shard_id()];
auto erm = co_await ss.get_erm_factory().create_static_effective_replication_map(rs, tmptr);
pending_effective_replication_maps[this_shard_id()].emplace(ks_name, std::move(erm));
change.pending_effective_replication_maps[this_shard_id()].emplace(ks_name, std::move(erm));
}
});
// Prepare per-table erms.
co_await container().invoke_on_all([&] (storage_service& ss) -> future<> {
auto& db = ss._db.local();
auto tmptr = pending_token_metadata_ptr[this_shard_id()];
auto tmptr = change.pending_token_metadata_ptr[this_shard_id()];
co_await db.get_tables_metadata().for_each_table_gently([&] (table_id id, lw_shared_ptr<replica::table> table) {
auto rs = db.find_keyspace(table->schema()->ks_name()).get_replication_strategy_ptr();
locator::effective_replication_map_ptr erm;
if (auto pt_rs = rs->maybe_as_per_table()) {
erm = pt_rs->make_replication_map(id, tmptr);
} else {
erm = pending_effective_replication_maps[this_shard_id()][table->schema()->ks_name()];
erm = change.pending_effective_replication_maps[this_shard_id()][table->schema()->ks_name()];
}
if (table->schema()->is_view()) {
pending_view_erms[this_shard_id()].emplace(id, std::move(erm));
change.pending_view_erms[this_shard_id()].emplace(id, std::move(erm));
} else {
pending_table_erms[this_shard_id()].emplace(id, std::move(erm));
change.pending_table_erms[this_shard_id()].emplace(id, std::move(erm));
}
return make_ready_future();
});
@@ -3307,10 +3296,10 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
if (ex) {
try {
co_await smp::invoke_on_all([&] () -> future<> {
auto tmptr = std::move(pending_token_metadata_ptr[this_shard_id()]);
auto erms = std::move(pending_effective_replication_maps[this_shard_id()]);
auto table_erms = std::move(pending_table_erms[this_shard_id()]);
auto view_erms = std::move(pending_view_erms[this_shard_id()]);
auto tmptr = std::move(change.pending_token_metadata_ptr[this_shard_id()]);
auto erms = std::move(change.pending_effective_replication_maps[this_shard_id()]);
auto table_erms = std::move(change.pending_table_erms[this_shard_id()]);
auto view_erms = std::move(change.pending_view_erms[this_shard_id()]);
co_await utils::clear_gently(erms);
co_await utils::clear_gently(tmptr);
@@ -3324,58 +3313,58 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
std::rethrow_exception(std::move(ex));
}
// Apply changes on all shards
co_return change;
}
void storage_service::commit_token_metadata_change(token_metadata_change& change) noexcept {
slogger.debug("Replicating token_metadata");
// Apply changes on a single shard
try {
co_await container().invoke_on_all([&] (storage_service& ss) -> future<> {
ss._shared_token_metadata.set(std::move(pending_token_metadata_ptr[this_shard_id()]));
auto& db = ss._db.local();
_shared_token_metadata.set(std::move(change.pending_token_metadata_ptr[this_shard_id()]));
auto& db =_db.local();
auto& erms = pending_effective_replication_maps[this_shard_id()];
for (auto it = erms.begin(); it != erms.end(); ) {
auto& ks = db.find_keyspace(it->first);
ks.update_static_effective_replication_map(std::move(it->second));
it = erms.erase(it);
}
auto& erms = change.pending_effective_replication_maps[this_shard_id()];
for (auto it = erms.begin(); it != erms.end(); ) {
auto& ks = db.find_keyspace(it->first);
ks.update_static_effective_replication_map(std::move(it->second));
it = erms.erase(it);
}
auto& table_erms = pending_table_erms[this_shard_id()];
auto& view_erms = pending_view_erms[this_shard_id()];
for (auto it = table_erms.begin(); it != table_erms.end(); ) {
co_await coroutine::maybe_yield();
// Update base/views effective_replication_maps atomically.
auto& cf = db.find_column_family(it->first);
cf.update_effective_replication_map(std::move(it->second));
for (const auto& view_ptr : cf.views()) {
const auto& view_id = view_ptr->id();
auto view_it = view_erms.find(view_id);
if (view_it == view_erms.end()) {
throw std::runtime_error(format("Could not find pending effective_replication_map for view {}.{} id={}", view_ptr->ks_name(), view_ptr->cf_name(), view_id));
}
auto& view = db.find_column_family(view_id);
view.update_effective_replication_map(std::move(view_it->second));
if (view.uses_tablets()) {
register_tablet_split_candidate(view_it->first);
}
view_erms.erase(view_it);
auto& table_erms = change.pending_table_erms[this_shard_id()];
auto& view_erms = change.pending_view_erms[this_shard_id()];
for (auto it = table_erms.begin(); it != table_erms.end(); ) {
// Update base/views effective_replication_maps atomically.
auto& cf = db.find_column_family(it->first);
cf.update_effective_replication_map(std::move(it->second));
for (const auto& view_ptr : cf.views()) {
const auto& view_id = view_ptr->id();
auto view_it = view_erms.find(view_id);
if (view_it == view_erms.end()) {
throw std::runtime_error(format("Could not find pending effective_replication_map for view {}.{} id={}", view_ptr->ks_name(), view_ptr->cf_name(), view_id));
}
if (cf.uses_tablets()) {
register_tablet_split_candidate(it->first);
auto& view = db.find_column_family(view_id);
view.update_effective_replication_map(std::move(view_it->second));
if (view.uses_tablets()) {
register_tablet_split_candidate(view_it->first);
}
it = table_erms.erase(it);
view_erms.erase(view_it);
}
if (!view_erms.empty()) {
throw std::runtime_error(fmt::format("Found orphaned pending effective_replication_maps for the following views: {}", std::views::keys(view_erms)));
if (cf.uses_tablets()) {
register_tablet_split_candidate(it->first);
}
it = table_erms.erase(it);
}
auto& session_mgr = get_topology_session_manager();
session_mgr.initiate_close_of_sessions_except(open_sessions);
for (auto id : open_sessions) {
session_mgr.create_session(id);
}
if (!view_erms.empty()) {
throw std::runtime_error(fmt::format("Found orphaned pending effective_replication_maps for the following views: {}", std::views::keys(view_erms)));
}
auto& gc_state = db.get_compaction_manager().get_shared_tombstone_gc_state();
co_await gc_state.flush_pending_repair_time_update(db);
});
auto& session_mgr = get_topology_session_manager();
session_mgr.initiate_close_of_sessions_except(change.open_sessions);
for (auto id : change.open_sessions) {
session_mgr.create_session(id);
}
} catch (...) {
// applying the changes on all shards should never fail
// it will end up in an inconsistent state that we can't recover from.
@@ -3384,6 +3373,27 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
}
}
future<> token_metadata_change::destroy() {
return smp::invoke_on_all([this] () -> future<> {
pending_token_metadata_ptr[this_shard_id()] = nullptr;
co_await utils::clear_gently(pending_effective_replication_maps[this_shard_id()]);
co_await utils::clear_gently(pending_table_erms[this_shard_id()]);
co_await utils::clear_gently(pending_view_erms[this_shard_id()]);
});
}
future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmptr) noexcept {
SCYLLA_ASSERT(this_shard_id() == 0);
slogger.debug("Replicating token_metadata to all cores");
auto change = co_await prepare_token_metadata_change(tmptr);
co_await container().invoke_on_all([&change] (storage_service& ss) {
ss.commit_token_metadata_change(change);
});
co_await change.destroy();
co_await _db.local().get_compaction_manager().get_shared_tombstone_gc_state().
flush_pending_repair_time_update(_db.local());
}
future<> storage_service::stop() {
// if there is a background "isolate" shutdown
// in progress, we need to sync with it. Mostly

View File

@@ -141,6 +141,16 @@ class node_ops_meta_data;
using start_hint_manager = seastar::bool_class<class start_hint_manager_tag>;
using loosen_constraints = seastar::bool_class<class loosen_constraints_tag>;
struct token_metadata_change {
std::vector<locator::mutable_token_metadata_ptr> pending_token_metadata_ptr{smp::count};
std::vector<std::unordered_map<sstring, locator::static_effective_replication_map_ptr>> pending_effective_replication_maps{smp::count};
std::vector<std::unordered_map<table_id, locator::effective_replication_map_ptr>> pending_table_erms{smp::count};
std::vector<std::unordered_map<table_id, locator::effective_replication_map_ptr>> pending_view_erms{smp::count};
std::unordered_set<session_id> open_sessions;
future<> destroy();
};
/**
* This abstraction contains the token/identifier of this node
* on the identifier space. This token gets gossiped around.
@@ -289,6 +299,15 @@ private:
// Note: must be called on shard 0.
future<> mutate_token_metadata(std::function<future<> (mutable_token_metadata_ptr)> func, acquire_merge_lock aml = acquire_merge_lock::yes) noexcept;
// Prepares token metadata change without making it visible. Combined with commit function
// and appropiate lock it does exactly the same as mutate_token_metadata.
// Note: prepare_token_metadata_change must be called on shard 0.
future<token_metadata_change> prepare_token_metadata_change(mutable_token_metadata_ptr tmptr);
// Commits prepared token metadata changes. Must be called under token_metadata_lock
// and on all shards.
void commit_token_metadata_change(token_metadata_change& change) noexcept;
// Update pending ranges locally and then replicate to all cores.
// Should be serialized under token_metadata_lock.
// Must be called on shard 0.