diff --git a/replica/database.cc b/replica/database.cc index 57cc6352db..1c3da66b0f 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -97,9 +97,8 @@ make_flush_controller(const db::config& cfg, backlog_controller::scheduling_grou return flush_controller(sg, cfg.memtable_flush_static_shares(), 50ms, cfg.unspooled_dirty_soft_limit(), std::move(fn)); } -keyspace::keyspace(lw_shared_ptr metadata, config cfg, locator::effective_replication_map_factory& erm_factory) - : _metadata(std::move(metadata)) - , _config(std::move(cfg)) +keyspace::keyspace(config cfg, locator::effective_replication_map_factory& erm_factory) + : _config(std::move(cfg)) , _erm_factory(erm_factory) {} @@ -851,26 +850,42 @@ future<> database::modify_keyspace_on_all_shards(sharded& sharded_db, }); } -future<> database::update_keyspace(const keyspace_metadata& tmp_ksm) { - auto& ks = find_keyspace(tmp_ksm.name()); - auto new_ksm = ::make_lw_shared(tmp_ksm.name(), tmp_ksm.strategy_name(), tmp_ksm.strategy_options(), tmp_ksm.initial_tablets(), tmp_ksm.durable_writes(), - ks.metadata()->cf_meta_data() | std::views::values | std::ranges::to(), std::move(ks.metadata()->user_types()), tmp_ksm.get_storage_options()); +future database::prepare_update_keyspace(const keyspace& ks, lw_shared_ptr metadata) const { + auto strategy = keyspace::create_replication_strategy(metadata); + locator::vnode_effective_replication_map_ptr erm = nullptr; + if (!strategy->is_per_table()) { + erm = co_await ks.create_effective_replication_map(strategy, + get_shared_token_metadata()); + } + co_return keyspace_change{ + .metadata = metadata, + .strategy = std::move(strategy), + .erm = std::move(erm), + }; +} +void database::update_keyspace(keyspace_change change) { + auto& ks = find_keyspace(change.metadata->name()); bool old_durable_writes = ks.metadata()->durable_writes(); - bool new_durable_writes = new_ksm->durable_writes(); + bool new_durable_writes = change.metadata->durable_writes(); if (old_durable_writes != new_durable_writes) { - for (auto& [cf_name, cf_schema] : new_ksm->cf_meta_data()) { + for (auto& [cf_name, cf_schema] : change.metadata->cf_meta_data()) { auto& cf = find_column_family(cf_schema); cf.set_durable_writes(new_durable_writes); } } - - co_await ks.update_from(get_shared_token_metadata(), std::move(new_ksm)); + ks.apply(change); } future<> database::update_keyspace_on_all_shards(sharded& sharded_db, const keyspace_metadata& ksm) { - return modify_keyspace_on_all_shards(sharded_db, [&] (replica::database& db) { - return db.update_keyspace(ksm); + return modify_keyspace_on_all_shards(sharded_db, [&] (replica::database& db) -> future<> { + auto& ks = db.find_keyspace(ksm.name()); + auto new_ksm = ::make_lw_shared(ksm.name(), ksm.strategy_name(), ksm.strategy_options(), ksm.initial_tablets(), ksm.durable_writes(), + ks.metadata()->cf_meta_data() | std::views::values | std::ranges::to(), ks.metadata()->user_types(), ksm.get_storage_options()); + + auto change = co_await db.prepare_update_keyspace(ks, new_ksm); + db.update_keyspace(std::move(change)); + co_return; }); } @@ -1247,19 +1262,17 @@ bool database::column_family_exists(const table_id& uuid) const { return _tables_metadata.contains(uuid); } -future<> -keyspace::create_replication_strategy(const locator::shared_token_metadata& stm) { +locator::replication_strategy_ptr +keyspace::create_replication_strategy(lw_shared_ptr metadata) { using namespace locator; - - locator::replication_strategy_params params(_metadata->strategy_options(), _metadata->initial_tablets()); - _replication_strategy = - abstract_replication_strategy::create_replication_strategy(_metadata->strategy_name(), params); + replication_strategy_params params(metadata->strategy_options(), metadata->initial_tablets()); rslogger.debug("replication strategy for keyspace {} is {}, opts={}", - _metadata->name(), _metadata->strategy_name(), _metadata->strategy_options()); - if (!_replication_strategy->is_per_table()) { - auto erm = co_await _erm_factory.create_effective_replication_map(_replication_strategy, stm.get()); - update_effective_replication_map(std::move(erm)); - } + metadata->name(), metadata->strategy_name(), metadata->strategy_options()); + return abstract_replication_strategy::create_replication_strategy(metadata->strategy_name(), params); +} + +future keyspace::create_effective_replication_map(locator::replication_strategy_ptr strategy, const locator::shared_token_metadata& stm) const { + co_return co_await _erm_factory.create_effective_replication_map(strategy, stm.get()); } void @@ -1272,9 +1285,10 @@ keyspace::get_replication_strategy() const { return *_replication_strategy; } -future<> keyspace::update_from(const locator::shared_token_metadata& stm, ::lw_shared_ptr ksm) { - _metadata = std::move(ksm); - return create_replication_strategy(stm); +void keyspace::apply(keyspace_change kc) { + _metadata = std::move(kc.metadata); + _replication_strategy = std::move(kc.strategy); + _effective_replication_map = std::move(kc.erm); } column_family::config @@ -1366,8 +1380,9 @@ std::vector database::get_views() const { future> database::create_in_memory_keyspace(const lw_shared_ptr& ksm, locator::effective_replication_map_factory& erm_factory, system_keyspace system) { auto kscfg = make_keyspace_config(*ksm, system); - auto ks(std::make_unique(ksm, std::move(kscfg), erm_factory)); - co_await ks->create_replication_strategy(get_shared_token_metadata()); + auto ks(std::make_unique(std::move(kscfg), erm_factory)); + auto change = co_await prepare_update_keyspace(*ks, ksm); + ks->apply(std::move(change)); co_return ks; } diff --git a/replica/database.hh b/replica/database.hh index b9e79c2611..ecabe483dc 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1327,6 +1327,13 @@ using user_types_metadata = data_dictionary::user_types_metadata; using keyspace_metadata = data_dictionary::keyspace_metadata; +// Encapsulates objects needed to update keyspace schema +struct keyspace_change { + lw_shared_ptr metadata; + locator::replication_strategy_ptr strategy; + locator::vnode_effective_replication_map_ptr erm; +}; + class keyspace { public: struct config { @@ -1358,14 +1365,14 @@ private: locator::effective_replication_map_factory& _erm_factory; public: - explicit keyspace(lw_shared_ptr metadata, config cfg, locator::effective_replication_map_factory& erm_factory); + explicit keyspace(config cfg, locator::effective_replication_map_factory& erm_factory); keyspace(const keyspace&) = delete; void operator=(const keyspace&) = delete; keyspace(keyspace&&) = default; future<> shutdown() noexcept; - future<> update_from(const locator::shared_token_metadata& stm, lw_shared_ptr); + void apply(keyspace_change kc); future<> init_storage(); @@ -1374,7 +1381,12 @@ public: * boom, it is replaced. */ lw_shared_ptr metadata() const; - future<> create_replication_strategy(const locator::shared_token_metadata& stm); + + static locator::replication_strategy_ptr create_replication_strategy( + lw_shared_ptr metadata); + future create_effective_replication_map( + locator::replication_strategy_ptr strategy, + const locator::shared_token_metadata& stm) const; void update_effective_replication_map(locator::vnode_effective_replication_map_ptr erm); /** @@ -1657,7 +1669,8 @@ private: void insert_keyspace(std::unique_ptr ks); future<> remove(table&) noexcept; void drop_keyspace(const sstring& name); - future<> update_keyspace(const keyspace_metadata& tmp_ksm); + future prepare_update_keyspace(const keyspace& ks, lw_shared_ptr metadata) const; + void update_keyspace(keyspace_change change); static future<> modify_keyspace_on_all_shards(sharded& sharded_db, std::function(replica::database&)> func); future<> foreach_reader_concurrency_semaphore(std::function(reader_concurrency_semaphore&)> func);