cql3: ks_prop_defs: Expand numeric RF to rack list

Auto-exands numeric RF in CREATE/ALTER KEYSPACE statements for
new DCs specified in the statement.

Doesn't auto-expand existing options, as the rack choice may not be in
line with current replica placement. This requires co-locating tablet
replicas, and tracking of co-location state, which is not implemented yet.

Signed-off-by: Tomasz Grabiec <tgrabiec@scylladb.com>
This commit is contained in:
Tomasz Grabiec
2025-05-22 11:32:35 +03:00
parent 35166809cb
commit 28f6bdc99b
11 changed files with 167 additions and 35 deletions

View File

@@ -65,7 +65,7 @@ future<> audit_cf_storage_helper::migrate_audit_table(service::group0_guard grou
data_dictionary::database db = _qp.db();
cql3::statements::ks_prop_defs old_ks_prop_defs;
auto old_ks_metadata = old_ks_prop_defs.as_ks_metadata_update(
ks->metadata(), *_qp.proxy().get_token_metadata_ptr(), db.features());
ks->metadata(), *_qp.proxy().get_token_metadata_ptr(), db.features(), db.get_config());
locator::replication_strategy_config_options strategy_opts;
for (const auto &dc: _qp.proxy().get_token_metadata_ptr()->get_topology().get_datacenters())
strategy_opts[dc] = "3";

View File

@@ -79,7 +79,7 @@ void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, c
current_options.type_string(), new_options.type_string()));
}
auto new_ks = _attrs->as_ks_metadata_update(ks.metadata(), *qp.proxy().get_token_metadata_ptr(), qp.proxy().features());
auto new_ks = _attrs->as_ks_metadata_update(ks.metadata(), *qp.proxy().get_token_metadata_ptr(), qp.proxy().features(), qp.db().get_config());
auto tmptr = qp.proxy().get_token_metadata_ptr();
const auto& topo = tmptr->get_topology();
@@ -145,7 +145,7 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
const auto tmptr = qp.proxy().get_token_metadata_ptr();
const auto& topo = tmptr->get_topology();
const auto& feat = qp.proxy().features();
auto ks_md_update = _attrs->as_ks_metadata_update(ks_md, *tmptr, feat);
auto ks_md_update = _attrs->as_ks_metadata_update(ks_md, *tmptr, feat, qp.db().get_config());
utils::chunked_vector<mutation> muts;
std::vector<sstring> warnings;

View File

@@ -19,6 +19,7 @@
#include "exceptions/exceptions.hh"
#include "gms/feature_service.hh"
#include "db/config.hh"
#include <random>
namespace cql3 {
@@ -26,9 +27,74 @@ namespace statements {
static logging::logger logger("ks_prop_defs");
static
locator::replication_strategy_config_option
expand_to_racks(const locator::token_metadata& tm,
const sstring& dc,
const locator::replication_strategy_config_option& rf,
const locator::replication_strategy_config_options& old_options)
{
auto dc_racks = locator::get_allowed_racks(tm, dc);
logger.debug("expand_to_racks: dc={} rf={} allowed_racks={}", dc, rf, dc_racks);
if (!tm.get_topology().get_datacenters().contains(dc)) {
throw exceptions::configuration_exception(fmt::format("Unrecognized datacenter name '{}'", dc));
}
auto data = locator::abstract_replication_strategy::parse_replication_factor(rf);
data.validate(std::ranges::to<std::unordered_set<sstring>>(dc_racks));
if (data.is_rack_based()) {
return rf;
}
if (data.count() == 0) {
return locator::rack_list();
}
if (data.count() > dc_racks.size()) {
throw exceptions::configuration_exception(fmt::format(
"Replication factor {} exceeds the number of racks ({}) in dc {}", data.count(), dc_racks.size(), dc));
}
// Handle ALTER:
// ([]|0) -> numeric is allowed, there are no existing replicas
// numeric -> numeric' is not supported. User should convert RF to rack list of equal count first.
// rack_list -> len(rack_list) is allowed (no-op)
// rack_list -> numeric is not allowed
if (old_options.contains(dc)) {
auto& old_rf_val = old_options.at(dc);
auto old_rf = locator::replication_factor_data(old_rf_val);
if (old_rf.is_rack_based()) {
if (old_rf.count() == data.count()) {
return old_rf_val;
} else if (old_rf.count() > 0) {
throw exceptions::configuration_exception(fmt::format(
"Cannot change replication factor for '{}' from {} to numeric {}, use rack list instead",
dc, old_rf_val, data.count()));
}
} else if (old_rf.count() > 0) {
throw exceptions::configuration_exception(fmt::format(
"Cannot change replication factor for '{}' from {} to {}, only rack list is allowed",
dc, old_rf_val, data.count()));
}
}
// If the replication factor is less than the number of racks, pick rf racks at random.
if (data.count() < dc_racks.size()) {
static thread_local auto gen = std::default_random_engine(std::random_device{}());
std::ranges::shuffle(dc_racks, gen);
dc_racks.resize(data.count());
}
return dc_racks;
}
static locator::replication_strategy_config_options prepare_options(
const sstring& strategy_class,
const locator::token_metadata& tm,
bool rf_rack_valid_keyspaces,
locator::replication_strategy_config_options options,
const locator::replication_strategy_config_options& old_options,
bool rack_list_enabled,
@@ -37,8 +103,11 @@ static locator::replication_strategy_config_options prepare_options(
auto is_nts = locator::abstract_replication_strategy::to_qualified_class_name(strategy_class) == "org.apache.cassandra.locator.NetworkTopologyStrategy";
auto is_alter = !old_options.empty();
const auto& all_dcs = tm.get_datacenter_racks_token_owners();
auto auto_expand_racks = uses_tablets && rf_rack_valid_keyspaces && rack_list_enabled;
logger.debug("prepare_options: {}: is_nts={} old_options={} new_options={}", strategy_class, is_nts, old_options, options);
logger.debug("prepare_options: {}: is_nts={} auto_expand_racks={} rack_list_enabled={} old_options={} new_options={} all_dcs={}",
strategy_class, is_nts, auto_expand_racks, rack_list_enabled, old_options, options, all_dcs);
if (!is_nts) {
return options;
@@ -48,7 +117,6 @@ static locator::replication_strategy_config_options prepare_options(
// If the user simply switches from another strategy without providing any options,
// but the other strategy used the 'replication_factor' option, it will also be expanded.
// See issue CASSANDRA-14303.
std::optional<sstring> rf;
auto it = options.find(ks_prop_defs::REPLICATION_FACTOR_KEY);
if (it != options.end()) {
@@ -115,6 +183,18 @@ static locator::replication_strategy_config_options prepare_options(
}
}
if (!rf && options.empty() && old_options.empty()) {
if (all_dcs.empty()) {
throw request_validations::invalid_request("No data centers found in the cluster, cannot determine replication factor");
}
for (const auto& [dc, racks_map] : all_dcs) {
if (racks_map.empty()) {
continue;
}
options.emplace(dc, std::to_string(racks_map.size()));
}
}
if (rf.has_value()) {
locator::replication_factor_data::parse(*rf);
@@ -128,17 +208,11 @@ static locator::replication_strategy_config_options prepare_options(
for (const auto& dc : tm.get_topology().get_datacenters()) {
options.emplace(dc, *rf);
}
} else if (options.empty() && old_options.empty()) {
// For default replication factor consider only racks with nodes that are NOT zero-token only nodes,
auto dc_racks = tm.get_datacenter_racks_token_owners();
if (dc_racks.empty()) {
throw request_validations::invalid_request("No data centers found in the cluster, cannot determine replication factor");
}
for (const auto& [dc, racks_map] : dc_racks) {
if (racks_map.empty()) {
continue;
}
options.emplace(dc, std::to_string(racks_map.size()));
}
if (auto_expand_racks) {
for (const auto& [dc, dc_rf] : options) {
options[dc] = expand_to_racks(tm, dc, dc_rf, old_options);
}
}
@@ -327,12 +401,12 @@ lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata(s
auto initial_tablets = get_initial_tablets(default_initial_tablets, cfg.enforce_tablets());
bool uses_tablets = initial_tablets.has_value();
bool rack_list_enabled = feat.rack_list_rf;
auto options = prepare_options(sc, tm, get_replication_options(), {}, rack_list_enabled, uses_tablets);
auto options = prepare_options(sc, tm, cfg.rf_rack_valid_keyspaces(), get_replication_options(), {}, rack_list_enabled, uses_tablets);
return data_dictionary::keyspace_metadata::new_keyspace(ks_name, sc,
std::move(options), initial_tablets, get_consistency_option(), get_boolean(KW_DURABLE_WRITES, true), get_storage_options());
}
lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata_update(lw_shared_ptr<data_dictionary::keyspace_metadata> old, const locator::token_metadata& tm, const gms::feature_service& feat) {
lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata_update(lw_shared_ptr<data_dictionary::keyspace_metadata> old, const locator::token_metadata& tm, const gms::feature_service& feat, const db::config& cfg) {
locator::replication_strategy_config_options options;
const auto& old_options = old->strategy_options();
// if tablets options have not been specified, inherit them if it's tablets-enabled KS
@@ -344,7 +418,7 @@ lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata_u
auto sc = get_replication_strategy_class();
bool rack_list_enabled = feat.rack_list_rf;
if (sc) {
options = prepare_options(*sc, tm, get_replication_options(), old_options, rack_list_enabled, uses_tablets);
options = prepare_options(*sc, tm, cfg.rf_rack_valid_keyspaces(), get_replication_options(), old_options, rack_list_enabled, uses_tablets);
} else {
sc = old->strategy_name();
options = old_options;

View File

@@ -80,7 +80,7 @@ public:
data_dictionary::storage_options get_storage_options() const;
bool get_durable_writes() const;
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata(sstring ks_name, const locator::token_metadata&, const gms::feature_service&, const db::config&);
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata_update(lw_shared_ptr<data_dictionary::keyspace_metadata> old, const locator::token_metadata&, const gms::feature_service&);
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata_update(lw_shared_ptr<data_dictionary::keyspace_metadata> old, const locator::token_metadata&, const gms::feature_service&, const db::config&);
};
}

View File

@@ -190,6 +190,18 @@ then every rack in every datacenter receives a replica, except for racks compris
of only :doc:`zero-token nodes </architecture/zero-token-nodes>`. Racks added after
the keyspace creation do not receive replicas.
When ``rf_rack_valid_keyspaces``` is enabled in the config and the keyspace is tablet-based,
the numeric replication factor is automatically expanded into a rack list when the statement is
executed, which can be observed in the DESCRIBE output afterwards. If the numeric RF is smaller than
the number of racks in a DC, a subset of racks is chosen arbitrarily.
Altering from a rack list to a numeric replication factor is not supported, except
for two cases. One is setting replication factor to 0, in which case the number of replicas is reduced to 0 in that DC.
The other is when the numeric replication factor is equal to the current number of replicas
for a given datacanter, in which case the current rack list is preserved.
Altering from a numeric replication factor to a rack list is not supported yet.
Note that when ``ALTER`` ing keyspaces and supplying ``replication_factor``,
auto-expansion will only *add* new datacenters for safety, it will not alter
existing datacenters or remove any even if they are no longer in the cluster.

View File

@@ -1373,6 +1373,26 @@ void assert_rf_rack_valid_keyspace(std::string_view ks, const token_metadata_ptr
tablet_logger.debug("[assert_rf_rack_valid_keyspace]: Keyspace '{}' has been verified to be RF-rack-valid", ks);
}
rack_list get_allowed_racks(const locator::token_metadata& tm, const sstring& dc) {
auto& topo = tm.get_topology();
auto normal_nodes = [&] (const sstring& rack) {
int count = 0;
for (auto n : topo.get_datacenter_rack_nodes().at(dc).at(rack)) {
count += int(n.get().is_normal());
}
return count;
};
const auto& all_dcs = tm.get_datacenter_racks_token_owners();
auto it = all_dcs.find(dc);
if (it != all_dcs.end()) {
return it->second | std::views::keys
| std::views::filter([&] (const sstring& rack) { return normal_nodes(rack) > 0; })
| std::ranges::to<std::vector<sstring>>();
}
return {};
}
}
auto fmt::formatter<locator::resize_decision_way>::format(const locator::resize_decision_way& way, fmt::format_context& ctx) const

View File

@@ -16,6 +16,7 @@
#include "dht/i_partitioner_fwd.hh"
#include "dht/token-sharding.hh"
#include "dht/ring_position.hh"
#include "locator/topology.hh"
#include "schema/schema_fwd.hh"
#include "utils/chunked_vector.hh"
#include "utils/hash.hh"
@@ -855,6 +856,9 @@ class abstract_replication_strategy;
/// * The keyspace need not exist. We use its name purely for informational reasons (in error messages).
void assert_rf_rack_valid_keyspace(std::string_view ks, const token_metadata_ptr, const abstract_replication_strategy&);
/// Returns the list of racks that can be used for placing replicas in a given DC.
rack_list get_allowed_racks(const locator::token_metadata&, const sstring& dc);
}
template <>

View File

@@ -988,7 +988,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
auto tmptr = get_token_metadata_ptr();
cql3::statements::ks_prop_defs new_ks_props{std::map<sstring, sstring>{saved_ks_props.begin(), saved_ks_props.end()}};
new_ks_props.validate();
auto ks_md = new_ks_props.as_ks_metadata_update(ks.metadata(), *tmptr, _db.features());
auto ks_md = new_ks_props.as_ks_metadata_update(ks.metadata(), *tmptr, _db.features(), _db.get_config());
size_t unimportant_init_tablet_count = 2; // must be a power of 2
locator::tablet_map new_tablet_map{unimportant_init_tablet_count};

View File

@@ -992,18 +992,18 @@ SEASTAR_TEST_CASE(test_rack_list_rf) {
BOOST_REQUIRE(describe(e, "ks22").contains("'dc2': ['rack2a', 'rack2b']"));
}
// Two DCs, one using rack list, one using numeric RF
// No auto-expansion to rack list.
// Two DCs, one using rack list, one using numeric RF (auto-expanded)
{
e.execute_cql("CREATE KEYSPACE ks2n2 WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'dc1': 2, "
"'dc2': ['rack2a', 'rack2b']} AND tablets = {'enabled':true}").get();
auto& opts = e.local_db().find_keyspace("ks2n2").get_replication_strategy().get_config_options();
BOOST_REQUIRE(replication_factor_data(opts.at("dc1")).is_numeric());
BOOST_REQUIRE_EQUAL(replication_factor_data(opts.at("dc1")).get_rack_list(),
std::vector<sstring>({"rack1a", "rack1b"}));
BOOST_REQUIRE_EQUAL(replication_factor_data(opts.at("dc2")).get_rack_list(),
std::vector<sstring>({"rack2a", "rack2b"}));
BOOST_REQUIRE_EQUAL(replication_factor_data(opts.at("dc1")).count(), 2);
BOOST_REQUIRE_EQUAL(replication_factor_data(opts.at("dc2")).count(), 2);
BOOST_REQUIRE(describe(e, "ks2n2").contains("'dc1': '2'"));
BOOST_REQUIRE(describe(e, "ks2n2").contains("'dc1': ['rack1a', 'rack1b']"));
BOOST_REQUIRE(describe(e, "ks2n2").contains("'dc2': ['rack2a', 'rack2b']"));
}
@@ -1066,6 +1066,29 @@ SEASTAR_TEST_CASE(test_rack_list_rejected_when_feature_not_enabled) {
}, cfg);
}
SEASTAR_TEST_CASE(test_altering_to_numeric_forbidden) {
auto cfg = cql_test_config();
cfg.db_config->tablets_mode_for_new_keyspaces(db::tablets_mode_t::mode::enabled);
cfg.disabled_features.insert("RACK_LIST_RF");
return do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
unsigned shard_count = 1;
topo.start_new_dc({"dc1", "rack1a"});
topo.add_node(service::node_state::normal, shard_count);
topo.start_new_rack("rack1b");
topo.add_node(service::node_state::normal, shard_count);
e.execute_cql("CREATE KEYSPACE ks1 WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'dc1': 1}").get();
e.get_feature_service().local().rack_list_rf.enable();
// Only altering to rack list should be allowed once rf_rack_valid is enabled.
BOOST_REQUIRE_THROW(e.execute_cql(
"ALTER KEYSPACE ks1 WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'dc1': 2}").get(),
exceptions::configuration_exception);
}, cfg);
}
SEASTAR_TEST_CASE(test_rack_list_rejected_when_using_vnodes) {
auto cfg = cql_test_config();
return do_with_cql_env_thread([] (auto& e) {

View File

@@ -1965,7 +1965,7 @@ alter_result alter_replication(cql_test_env& e,
new_ks_props.add_property("replication", alter_options);
new_ks_props.validate();
BOOST_REQUIRE(new_ks_props.get_replication_strategy_class().has_value());
auto ks_md = new_ks_props.as_ks_metadata_update(ks.metadata(), *tmptr, e.local_db().features());
auto ks_md = new_ks_props.as_ks_metadata_update(ks.metadata(), *tmptr, e.local_db().features(), e.local_db().get_config());
auto new_options = ks_md->strategy_options();
testlog.info("Altering {} from {} using {} to {}", ks_name, rs.get_config_options(), alter_options, new_options);

View File

@@ -189,10 +189,7 @@ async def test_create_and_alter_keyspace_with_altering_rf_and_racks(manager: Man
async def alter_fail(ks: str, rfs: List[int], failed_dc: int, rack_count: int) -> None:
rf = rfs[failed_dc - 1]
err = rf"Replication factor {rf} exceeds the number of racks|The option `rf_rack_valid_keyspaces` is enabled. It requires that all keyspaces are RF-rack-valid. " \
f"That condition is violated: keyspace '{ks}' doesn't satisfy it for DC 'dc{failed_dc}': RF={rf} vs. rack count={rack_count}."
with pytest.raises((ConfigurationException, InvalidRequest), match=err):
with pytest.raises((ConfigurationException, InvalidRequest)):
await alter_ok(ks, rfs)
# Step 1.
@@ -289,14 +286,16 @@ async def test_create_and_alter_keyspace_with_altering_rf_and_racks(manager: Man
for task in tasks:
_ = tg.create_task(task)
await alter_ok(ks1, [2, 1])
# Altering from rack list to numeric not supported.
await alter_fail(ks1, [2, 1], 1, 2)
# await alter_ok(ks1, [2, 1])
await alter_fail(ks1, [2, 2], 2, 1)
await alter_ok(ks2, [2, 1])
await alter_fail(ks2, [2, 1], 1, 2)
await alter_ok(ks3, [2, 1])
await alter_ok(ks4, [2, 1])
await alter_fail(ks4, [2, 1], 1, 2)
# RF = 1 is always OK!
await alter_ok(ks3, [1, 1])
await alter_fail(ks3, [1, 1], 1, 2)
@pytest.mark.asyncio
async def test_arbiter_dc_rf_rack_valid_keyspaces(manager: ManagerClient):
@@ -350,6 +349,7 @@ async def test_arbiter_dc_rf_rack_valid_keyspaces(manager: ManagerClient):
valid_keyspaces = [
create_ok([0, 0]),
create_ok([1, 0]),
create_ok([2, 0]),
create_ok([3, 0]),
create_ok(0)
]
@@ -358,7 +358,6 @@ async def test_arbiter_dc_rf_rack_valid_keyspaces(manager: ManagerClient):
# because then we can't predict what error will say.
invalid_keyspaces = [
create_fail([4, 0], 1, 4, 3),
create_fail([2, 0], 1, 2, 3),
create_fail([0, 1], 2, 1, 0),
create_fail([0, 2], 2, 2, 0),
create_fail([0, 3], 2, 3, 0),