mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-09 08:23:29 +00:00
replica: split update keyspace into two phases
- first phase is preemptive (prepare_update_keyspace) - second phase is non-preemptive (update_keyspace) This is done so that schema change can be applied atomically. Aditionally create keyspace code was changed to share common part with update keyspace flow. This commit doesn't yet change the behaviour of the code, as it doesn't guarantee atomicity, it will be done in following commits.
This commit is contained in:
@@ -97,9 +97,8 @@ make_flush_controller(const db::config& cfg, backlog_controller::scheduling_grou
|
||||
return flush_controller(sg, cfg.memtable_flush_static_shares(), 50ms, cfg.unspooled_dirty_soft_limit(), std::move(fn));
|
||||
}
|
||||
|
||||
keyspace::keyspace(lw_shared_ptr<keyspace_metadata> metadata, config cfg, locator::effective_replication_map_factory& erm_factory)
|
||||
: _metadata(std::move(metadata))
|
||||
, _config(std::move(cfg))
|
||||
keyspace::keyspace(config cfg, locator::effective_replication_map_factory& erm_factory)
|
||||
: _config(std::move(cfg))
|
||||
, _erm_factory(erm_factory)
|
||||
{}
|
||||
|
||||
@@ -851,26 +850,42 @@ future<> database::modify_keyspace_on_all_shards(sharded<database>& sharded_db,
|
||||
});
|
||||
}
|
||||
|
||||
future<> database::update_keyspace(const keyspace_metadata& tmp_ksm) {
|
||||
auto& ks = find_keyspace(tmp_ksm.name());
|
||||
auto new_ksm = ::make_lw_shared<keyspace_metadata>(tmp_ksm.name(), tmp_ksm.strategy_name(), tmp_ksm.strategy_options(), tmp_ksm.initial_tablets(), tmp_ksm.durable_writes(),
|
||||
ks.metadata()->cf_meta_data() | std::views::values | std::ranges::to<std::vector>(), std::move(ks.metadata()->user_types()), tmp_ksm.get_storage_options());
|
||||
future<keyspace_change> database::prepare_update_keyspace(const keyspace& ks, lw_shared_ptr<keyspace_metadata> metadata) const {
|
||||
auto strategy = keyspace::create_replication_strategy(metadata);
|
||||
locator::vnode_effective_replication_map_ptr erm = nullptr;
|
||||
if (!strategy->is_per_table()) {
|
||||
erm = co_await ks.create_effective_replication_map(strategy,
|
||||
get_shared_token_metadata());
|
||||
}
|
||||
co_return keyspace_change{
|
||||
.metadata = metadata,
|
||||
.strategy = std::move(strategy),
|
||||
.erm = std::move(erm),
|
||||
};
|
||||
}
|
||||
|
||||
void database::update_keyspace(keyspace_change change) {
|
||||
auto& ks = find_keyspace(change.metadata->name());
|
||||
bool old_durable_writes = ks.metadata()->durable_writes();
|
||||
bool new_durable_writes = new_ksm->durable_writes();
|
||||
bool new_durable_writes = change.metadata->durable_writes();
|
||||
if (old_durable_writes != new_durable_writes) {
|
||||
for (auto& [cf_name, cf_schema] : new_ksm->cf_meta_data()) {
|
||||
for (auto& [cf_name, cf_schema] : change.metadata->cf_meta_data()) {
|
||||
auto& cf = find_column_family(cf_schema);
|
||||
cf.set_durable_writes(new_durable_writes);
|
||||
}
|
||||
}
|
||||
|
||||
co_await ks.update_from(get_shared_token_metadata(), std::move(new_ksm));
|
||||
ks.apply(change);
|
||||
}
|
||||
|
||||
future<> database::update_keyspace_on_all_shards(sharded<database>& sharded_db, const keyspace_metadata& ksm) {
|
||||
return modify_keyspace_on_all_shards(sharded_db, [&] (replica::database& db) {
|
||||
return db.update_keyspace(ksm);
|
||||
return modify_keyspace_on_all_shards(sharded_db, [&] (replica::database& db) -> future<> {
|
||||
auto& ks = db.find_keyspace(ksm.name());
|
||||
auto new_ksm = ::make_lw_shared<keyspace_metadata>(ksm.name(), ksm.strategy_name(), ksm.strategy_options(), ksm.initial_tablets(), ksm.durable_writes(),
|
||||
ks.metadata()->cf_meta_data() | std::views::values | std::ranges::to<std::vector>(), ks.metadata()->user_types(), ksm.get_storage_options());
|
||||
|
||||
auto change = co_await db.prepare_update_keyspace(ks, new_ksm);
|
||||
db.update_keyspace(std::move(change));
|
||||
co_return;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1247,19 +1262,17 @@ bool database::column_family_exists(const table_id& uuid) const {
|
||||
return _tables_metadata.contains(uuid);
|
||||
}
|
||||
|
||||
future<>
|
||||
keyspace::create_replication_strategy(const locator::shared_token_metadata& stm) {
|
||||
locator::replication_strategy_ptr
|
||||
keyspace::create_replication_strategy(lw_shared_ptr<keyspace_metadata> metadata) {
|
||||
using namespace locator;
|
||||
|
||||
locator::replication_strategy_params params(_metadata->strategy_options(), _metadata->initial_tablets());
|
||||
_replication_strategy =
|
||||
abstract_replication_strategy::create_replication_strategy(_metadata->strategy_name(), params);
|
||||
replication_strategy_params params(metadata->strategy_options(), metadata->initial_tablets());
|
||||
rslogger.debug("replication strategy for keyspace {} is {}, opts={}",
|
||||
_metadata->name(), _metadata->strategy_name(), _metadata->strategy_options());
|
||||
if (!_replication_strategy->is_per_table()) {
|
||||
auto erm = co_await _erm_factory.create_effective_replication_map(_replication_strategy, stm.get());
|
||||
update_effective_replication_map(std::move(erm));
|
||||
}
|
||||
metadata->name(), metadata->strategy_name(), metadata->strategy_options());
|
||||
return abstract_replication_strategy::create_replication_strategy(metadata->strategy_name(), params);
|
||||
}
|
||||
|
||||
future<locator::vnode_effective_replication_map_ptr> keyspace::create_effective_replication_map(locator::replication_strategy_ptr strategy, const locator::shared_token_metadata& stm) const {
|
||||
co_return co_await _erm_factory.create_effective_replication_map(strategy, stm.get());
|
||||
}
|
||||
|
||||
void
|
||||
@@ -1272,9 +1285,10 @@ keyspace::get_replication_strategy() const {
|
||||
return *_replication_strategy;
|
||||
}
|
||||
|
||||
future<> keyspace::update_from(const locator::shared_token_metadata& stm, ::lw_shared_ptr<keyspace_metadata> ksm) {
|
||||
_metadata = std::move(ksm);
|
||||
return create_replication_strategy(stm);
|
||||
void keyspace::apply(keyspace_change kc) {
|
||||
_metadata = std::move(kc.metadata);
|
||||
_replication_strategy = std::move(kc.strategy);
|
||||
_effective_replication_map = std::move(kc.erm);
|
||||
}
|
||||
|
||||
column_family::config
|
||||
@@ -1366,8 +1380,9 @@ std::vector<view_ptr> database::get_views() const {
|
||||
|
||||
future<std::unique_ptr<keyspace>> database::create_in_memory_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm, locator::effective_replication_map_factory& erm_factory, system_keyspace system) {
|
||||
auto kscfg = make_keyspace_config(*ksm, system);
|
||||
auto ks(std::make_unique<keyspace>(ksm, std::move(kscfg), erm_factory));
|
||||
co_await ks->create_replication_strategy(get_shared_token_metadata());
|
||||
auto ks(std::make_unique<keyspace>(std::move(kscfg), erm_factory));
|
||||
auto change = co_await prepare_update_keyspace(*ks, ksm);
|
||||
ks->apply(std::move(change));
|
||||
co_return ks;
|
||||
}
|
||||
|
||||
|
||||
@@ -1327,6 +1327,13 @@ using user_types_metadata = data_dictionary::user_types_metadata;
|
||||
|
||||
using keyspace_metadata = data_dictionary::keyspace_metadata;
|
||||
|
||||
// Encapsulates objects needed to update keyspace schema
|
||||
struct keyspace_change {
|
||||
lw_shared_ptr<keyspace_metadata> metadata;
|
||||
locator::replication_strategy_ptr strategy;
|
||||
locator::vnode_effective_replication_map_ptr erm;
|
||||
};
|
||||
|
||||
class keyspace {
|
||||
public:
|
||||
struct config {
|
||||
@@ -1358,14 +1365,14 @@ private:
|
||||
locator::effective_replication_map_factory& _erm_factory;
|
||||
|
||||
public:
|
||||
explicit keyspace(lw_shared_ptr<keyspace_metadata> metadata, config cfg, locator::effective_replication_map_factory& erm_factory);
|
||||
explicit keyspace(config cfg, locator::effective_replication_map_factory& erm_factory);
|
||||
keyspace(const keyspace&) = delete;
|
||||
void operator=(const keyspace&) = delete;
|
||||
keyspace(keyspace&&) = default;
|
||||
|
||||
future<> shutdown() noexcept;
|
||||
|
||||
future<> update_from(const locator::shared_token_metadata& stm, lw_shared_ptr<keyspace_metadata>);
|
||||
void apply(keyspace_change kc);
|
||||
|
||||
future<> init_storage();
|
||||
|
||||
@@ -1374,7 +1381,12 @@ public:
|
||||
* boom, it is replaced.
|
||||
*/
|
||||
lw_shared_ptr<keyspace_metadata> metadata() const;
|
||||
future<> create_replication_strategy(const locator::shared_token_metadata& stm);
|
||||
|
||||
static locator::replication_strategy_ptr create_replication_strategy(
|
||||
lw_shared_ptr<keyspace_metadata> metadata);
|
||||
future<locator::vnode_effective_replication_map_ptr> create_effective_replication_map(
|
||||
locator::replication_strategy_ptr strategy,
|
||||
const locator::shared_token_metadata& stm) const;
|
||||
void update_effective_replication_map(locator::vnode_effective_replication_map_ptr erm);
|
||||
|
||||
/**
|
||||
@@ -1657,7 +1669,8 @@ private:
|
||||
void insert_keyspace(std::unique_ptr<keyspace> ks);
|
||||
future<> remove(table&) noexcept;
|
||||
void drop_keyspace(const sstring& name);
|
||||
future<> update_keyspace(const keyspace_metadata& tmp_ksm);
|
||||
future<keyspace_change> prepare_update_keyspace(const keyspace& ks, lw_shared_ptr<keyspace_metadata> metadata) const;
|
||||
void update_keyspace(keyspace_change change);
|
||||
static future<> modify_keyspace_on_all_shards(sharded<database>& sharded_db, std::function<future<>(replica::database&)> func);
|
||||
|
||||
future<> foreach_reader_concurrency_semaphore(std::function<future<>(reader_concurrency_semaphore&)> func);
|
||||
|
||||
Reference in New Issue
Block a user