mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-23 01:50:35 +00:00
service: handle multi_rf_change
Extend keyspace_rf_change handler to handle multi_rf_change. multi_rf_change is allowed only if we add or remove DCs and the keyspace uses rack list replication factor. The handler adds the request id to topology::ongoing_rf_changes. The request is further processed by load balancer.
This commit is contained in:
@@ -552,6 +552,19 @@ static std::unordered_map<sstring, std::vector<sstring>> subtract_replication(co
|
||||
return res;
|
||||
}
|
||||
|
||||
bool rf_count_per_dc_equals(const locator::replication_strategy_config_options& current, const locator::replication_strategy_config_options& next) {
|
||||
if (current.size() != next.size()) {
|
||||
return false;
|
||||
}
|
||||
for (const auto& [dc, current_rf_value] : current) {
|
||||
auto it = next.find(dc);
|
||||
if (it == next.end() || get_replication_factor(it->second) != get_replication_factor(current_rf_value)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/// The algorithm aims to equalize tablet count on each shard.
|
||||
/// This goal is based on the assumption that every shard has similar processing power and space capacity,
|
||||
/// and that each tablet has equal consumption of those resources. So by equalizing tablet count per shard we
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "replica/database_fwd.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "tablet_allocator_fwd.hh"
|
||||
#include "locator/token_metadata_fwd.hh"
|
||||
#include <seastar/core/metrics.hh>
|
||||
@@ -356,6 +357,8 @@ future<bool> requires_rack_list_colocation(
|
||||
db::system_keyspace* sys_ks,
|
||||
utils::UUID request_id);
|
||||
|
||||
bool rf_count_per_dc_equals(const locator::replication_strategy_config_options& current, const locator::replication_strategy_config_options& next);
|
||||
|
||||
}
|
||||
|
||||
template <>
|
||||
|
||||
@@ -977,6 +977,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
enum class keyspace_rf_change_kind {
|
||||
default_rf_change,
|
||||
conversion_to_rack_list,
|
||||
multi_rf_change
|
||||
};
|
||||
|
||||
future<keyspace_rf_change_kind> choose_keyspace_rf_change_kind(utils::UUID req_id,
|
||||
@@ -998,6 +999,25 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
}
|
||||
co_return rack_list_conversion ? co_await requires_rack_list_colocation(_db, get_token_metadata_ptr(), &_sys_ks, req_id) : false;
|
||||
};
|
||||
auto all_changes_are_0_N = [&] {
|
||||
auto all_dcs = old_replication_strategy_config | std::views::keys;
|
||||
auto new_dcs = new_replication_strategy_config | std::views::keys;
|
||||
std::set<sstring> dcs(all_dcs.begin(), all_dcs.end());
|
||||
dcs.insert(new_dcs.begin(), new_dcs.end());
|
||||
for (const auto& dc : dcs) {
|
||||
auto old_it = old_replication_strategy_config.find(dc);
|
||||
auto new_it = new_replication_strategy_config.find(dc);
|
||||
size_t old_rf = (old_it != old_replication_strategy_config.end()) ? locator::get_replication_factor(old_it->second) : 0;
|
||||
size_t new_rf = (new_it != new_replication_strategy_config.end()) ? locator::get_replication_factor(new_it->second) : 0;
|
||||
if (old_rf == new_rf) {
|
||||
continue;
|
||||
}
|
||||
if (old_rf != 0 && new_rf != 0) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
};
|
||||
|
||||
if (tables_with_mvs.empty()) {
|
||||
co_return keyspace_rf_change_kind::default_rf_change;
|
||||
@@ -1005,6 +1025,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
if (co_await check_needs_colocation()) {
|
||||
co_return keyspace_rf_change_kind::conversion_to_rack_list;
|
||||
}
|
||||
if (_feature_service.keyspace_multi_rf_change && locator::uses_rack_list_exclusively(old_replication_strategy_config) && locator::uses_rack_list_exclusively(new_replication_strategy_config) && !rf_count_per_dc_equals(old_replication_strategy_config, new_replication_strategy_config) && all_changes_are_0_N()) {
|
||||
co_return keyspace_rf_change_kind::multi_rf_change;
|
||||
}
|
||||
co_return keyspace_rf_change_kind::default_rf_change;
|
||||
}
|
||||
|
||||
@@ -1158,6 +1181,20 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
updates.push_back(canonical_mutation(tbuilder.build()));
|
||||
break;
|
||||
}
|
||||
case keyspace_rf_change_kind::multi_rf_change: {
|
||||
rtlogger.info("keyspace_rf_change for keyspace {} will use multi-rf change procedure", ks_name);
|
||||
ks_md->set_next_strategy_options(ks_md->strategy_options());
|
||||
ks_md->set_strategy_options(ks.metadata()->strategy_options()); // start from the old strategy
|
||||
auto schema_muts = prepare_keyspace_update_announcement(_db, ks_md, guard.write_timestamp());
|
||||
for (auto& m: schema_muts) {
|
||||
updates.emplace_back(m);
|
||||
}
|
||||
|
||||
topology_mutation_builder tbuilder = tbuilder_with_request_drop();
|
||||
tbuilder.start_rf_change_migrations(req_id);
|
||||
updates.push_back(canonical_mutation(tbuilder.build()));
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (const std::exception& e) {
|
||||
error = e.what();
|
||||
|
||||
Reference in New Issue
Block a user