From 2daa630938dc8cf188e54ef471e1a3f6eb8efc74 Mon Sep 17 00:00:00 2001 From: Marcin Maliszkiewicz Date: Fri, 13 Sep 2024 11:39:39 +0200 Subject: [PATCH] replica: split update keyspace into two phases - first phase is preemptive (prepare_update_keyspace) - second phase is non-preemptive (update_keyspace) This is done so that schema change can be applied atomically. Aditionally create keyspace code was changed to share common part with update keyspace flow. This commit doesn't yet change the behaviour of the code, as it doesn't guarantee atomicity, it will be done in following commits. --- replica/database.cc | 73 +++++++++++++++++++++++++++------------------ replica/database.hh | 21 ++++++++++--- 2 files changed, 61 insertions(+), 33 deletions(-) diff --git a/replica/database.cc b/replica/database.cc index 57cc6352db..1c3da66b0f 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -97,9 +97,8 @@ make_flush_controller(const db::config& cfg, backlog_controller::scheduling_grou return flush_controller(sg, cfg.memtable_flush_static_shares(), 50ms, cfg.unspooled_dirty_soft_limit(), std::move(fn)); } -keyspace::keyspace(lw_shared_ptr metadata, config cfg, locator::effective_replication_map_factory& erm_factory) - : _metadata(std::move(metadata)) - , _config(std::move(cfg)) +keyspace::keyspace(config cfg, locator::effective_replication_map_factory& erm_factory) + : _config(std::move(cfg)) , _erm_factory(erm_factory) {} @@ -851,26 +850,42 @@ future<> database::modify_keyspace_on_all_shards(sharded& sharded_db, }); } -future<> database::update_keyspace(const keyspace_metadata& tmp_ksm) { - auto& ks = find_keyspace(tmp_ksm.name()); - auto new_ksm = ::make_lw_shared(tmp_ksm.name(), tmp_ksm.strategy_name(), tmp_ksm.strategy_options(), tmp_ksm.initial_tablets(), tmp_ksm.durable_writes(), - ks.metadata()->cf_meta_data() | std::views::values | std::ranges::to(), std::move(ks.metadata()->user_types()), tmp_ksm.get_storage_options()); +future database::prepare_update_keyspace(const keyspace& ks, lw_shared_ptr metadata) const { + auto strategy = keyspace::create_replication_strategy(metadata); + locator::vnode_effective_replication_map_ptr erm = nullptr; + if (!strategy->is_per_table()) { + erm = co_await ks.create_effective_replication_map(strategy, + get_shared_token_metadata()); + } + co_return keyspace_change{ + .metadata = metadata, + .strategy = std::move(strategy), + .erm = std::move(erm), + }; +} +void database::update_keyspace(keyspace_change change) { + auto& ks = find_keyspace(change.metadata->name()); bool old_durable_writes = ks.metadata()->durable_writes(); - bool new_durable_writes = new_ksm->durable_writes(); + bool new_durable_writes = change.metadata->durable_writes(); if (old_durable_writes != new_durable_writes) { - for (auto& [cf_name, cf_schema] : new_ksm->cf_meta_data()) { + for (auto& [cf_name, cf_schema] : change.metadata->cf_meta_data()) { auto& cf = find_column_family(cf_schema); cf.set_durable_writes(new_durable_writes); } } - - co_await ks.update_from(get_shared_token_metadata(), std::move(new_ksm)); + ks.apply(change); } future<> database::update_keyspace_on_all_shards(sharded& sharded_db, const keyspace_metadata& ksm) { - return modify_keyspace_on_all_shards(sharded_db, [&] (replica::database& db) { - return db.update_keyspace(ksm); + return modify_keyspace_on_all_shards(sharded_db, [&] (replica::database& db) -> future<> { + auto& ks = db.find_keyspace(ksm.name()); + auto new_ksm = ::make_lw_shared(ksm.name(), ksm.strategy_name(), ksm.strategy_options(), ksm.initial_tablets(), ksm.durable_writes(), + ks.metadata()->cf_meta_data() | std::views::values | std::ranges::to(), ks.metadata()->user_types(), ksm.get_storage_options()); + + auto change = co_await db.prepare_update_keyspace(ks, new_ksm); + db.update_keyspace(std::move(change)); + co_return; }); } @@ -1247,19 +1262,17 @@ bool database::column_family_exists(const table_id& uuid) const { return _tables_metadata.contains(uuid); } -future<> -keyspace::create_replication_strategy(const locator::shared_token_metadata& stm) { +locator::replication_strategy_ptr +keyspace::create_replication_strategy(lw_shared_ptr metadata) { using namespace locator; - - locator::replication_strategy_params params(_metadata->strategy_options(), _metadata->initial_tablets()); - _replication_strategy = - abstract_replication_strategy::create_replication_strategy(_metadata->strategy_name(), params); + replication_strategy_params params(metadata->strategy_options(), metadata->initial_tablets()); rslogger.debug("replication strategy for keyspace {} is {}, opts={}", - _metadata->name(), _metadata->strategy_name(), _metadata->strategy_options()); - if (!_replication_strategy->is_per_table()) { - auto erm = co_await _erm_factory.create_effective_replication_map(_replication_strategy, stm.get()); - update_effective_replication_map(std::move(erm)); - } + metadata->name(), metadata->strategy_name(), metadata->strategy_options()); + return abstract_replication_strategy::create_replication_strategy(metadata->strategy_name(), params); +} + +future keyspace::create_effective_replication_map(locator::replication_strategy_ptr strategy, const locator::shared_token_metadata& stm) const { + co_return co_await _erm_factory.create_effective_replication_map(strategy, stm.get()); } void @@ -1272,9 +1285,10 @@ keyspace::get_replication_strategy() const { return *_replication_strategy; } -future<> keyspace::update_from(const locator::shared_token_metadata& stm, ::lw_shared_ptr ksm) { - _metadata = std::move(ksm); - return create_replication_strategy(stm); +void keyspace::apply(keyspace_change kc) { + _metadata = std::move(kc.metadata); + _replication_strategy = std::move(kc.strategy); + _effective_replication_map = std::move(kc.erm); } column_family::config @@ -1366,8 +1380,9 @@ std::vector database::get_views() const { future> database::create_in_memory_keyspace(const lw_shared_ptr& ksm, locator::effective_replication_map_factory& erm_factory, system_keyspace system) { auto kscfg = make_keyspace_config(*ksm, system); - auto ks(std::make_unique(ksm, std::move(kscfg), erm_factory)); - co_await ks->create_replication_strategy(get_shared_token_metadata()); + auto ks(std::make_unique(std::move(kscfg), erm_factory)); + auto change = co_await prepare_update_keyspace(*ks, ksm); + ks->apply(std::move(change)); co_return ks; } diff --git a/replica/database.hh b/replica/database.hh index b9e79c2611..ecabe483dc 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1327,6 +1327,13 @@ using user_types_metadata = data_dictionary::user_types_metadata; using keyspace_metadata = data_dictionary::keyspace_metadata; +// Encapsulates objects needed to update keyspace schema +struct keyspace_change { + lw_shared_ptr metadata; + locator::replication_strategy_ptr strategy; + locator::vnode_effective_replication_map_ptr erm; +}; + class keyspace { public: struct config { @@ -1358,14 +1365,14 @@ private: locator::effective_replication_map_factory& _erm_factory; public: - explicit keyspace(lw_shared_ptr metadata, config cfg, locator::effective_replication_map_factory& erm_factory); + explicit keyspace(config cfg, locator::effective_replication_map_factory& erm_factory); keyspace(const keyspace&) = delete; void operator=(const keyspace&) = delete; keyspace(keyspace&&) = default; future<> shutdown() noexcept; - future<> update_from(const locator::shared_token_metadata& stm, lw_shared_ptr); + void apply(keyspace_change kc); future<> init_storage(); @@ -1374,7 +1381,12 @@ public: * boom, it is replaced. */ lw_shared_ptr metadata() const; - future<> create_replication_strategy(const locator::shared_token_metadata& stm); + + static locator::replication_strategy_ptr create_replication_strategy( + lw_shared_ptr metadata); + future create_effective_replication_map( + locator::replication_strategy_ptr strategy, + const locator::shared_token_metadata& stm) const; void update_effective_replication_map(locator::vnode_effective_replication_map_ptr erm); /** @@ -1657,7 +1669,8 @@ private: void insert_keyspace(std::unique_ptr ks); future<> remove(table&) noexcept; void drop_keyspace(const sstring& name); - future<> update_keyspace(const keyspace_metadata& tmp_ksm); + future prepare_update_keyspace(const keyspace& ks, lw_shared_ptr metadata) const; + void update_keyspace(keyspace_change change); static future<> modify_keyspace_on_all_shards(sharded& sharded_db, std::function(replica::database&)> func); future<> foreach_reader_concurrency_semaphore(std::function(reader_concurrency_semaphore&)> func);