diff --git a/cql3/statements/alter_keyspace_statement.cc b/cql3/statements/alter_keyspace_statement.cc index ad910e4293..429b5ea21c 100644 --- a/cql3/statements/alter_keyspace_statement.cc +++ b/cql3/statements/alter_keyspace_statement.cc @@ -11,6 +11,7 @@ #include #include #include +#include #include #include "alter_keyspace_statement.hh" #include "prepared_statement.hh" @@ -43,18 +44,16 @@ future<> cql3::statements::alter_keyspace_statement::check_access(query_processo return state.has_keyspace_access(_name, auth::permission::ALTER); } -static bool validate_rf_difference(const std::string_view curr_rf, const std::string_view new_rf) { - auto to_number = [] (const std::string_view rf) { - int result; - // We assume the passed string view represents a valid decimal number, - // so we don't need the error code. - (void) std::from_chars(rf.begin(), rf.end(), result); - return result; - }; - - // We want to ensure that each DC's RF is going to change by at most 1 - // because in that case the old and new quorums must overlap. - return std::abs(to_number(curr_rf) - to_number(new_rf)) <= 1; +static unsigned get_abs_rf_diff(const std::string& curr_rf, const std::string& new_rf) { + try { + return std::abs(std::stoi(curr_rf) - std::stoi(new_rf)); + } catch (std::invalid_argument const& ex) { + on_internal_error(mylogger, fmt::format("get_abs_rf_diff expects integer arguments, " + "but got curr_rf:{} and new_rf:{}", curr_rf, new_rf)); + } catch (std::out_of_range const& ex) { + on_internal_error(mylogger, fmt::format("get_abs_rf_diff expects integer arguments to fit into `int` type, " + "but got curr_rf:{} and new_rf:{}", curr_rf, new_rf)); + } } void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, const service::client_state& state) const { @@ -84,11 +83,24 @@ void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, c auto new_ks = _attrs->as_ks_metadata_update(ks.metadata(), *qp.proxy().get_token_metadata_ptr(), qp.proxy().features()); if (ks.get_replication_strategy().uses_tablets()) { - const std::map& current_rfs = ks.metadata()->strategy_options(); - for (const auto& [new_dc, new_rf] : _attrs->get_replication_options()) { - auto it = current_rfs.find(new_dc); - if (it != current_rfs.end() && !validate_rf_difference(it->second, new_rf)) { - throw exceptions::invalid_request_exception("Cannot modify replication factor of any DC by more than 1 at a time."); + const std::map& current_rf_per_dc = ks.metadata()->strategy_options(); + auto new_rf_per_dc = _attrs->get_replication_options(); + new_rf_per_dc.erase(ks_prop_defs::REPLICATION_STRATEGY_CLASS_KEY); + unsigned total_abs_rfs_diff = 0; + for (const auto& [new_dc, new_rf] : new_rf_per_dc) { + sstring old_rf = "0"; + if (auto new_dc_in_current_mapping = current_rf_per_dc.find(new_dc); + new_dc_in_current_mapping != current_rf_per_dc.end()) { + old_rf = new_dc_in_current_mapping->second; + } else if (!qp.proxy().get_token_metadata_ptr()->get_topology().get_datacenters().contains(new_dc)) { + // This means that the DC listed in ALTER doesn't exist. This error will be reported later, + // during validation in abstract_replication_strategy::validate_replication_strategy. + // We can't report this error now, because it'd change the order of errors reported: + // first we need to report non-existing DCs, then if RFs aren't changed by too much. + continue; + } + if (total_abs_rfs_diff += get_abs_rf_diff(old_rf, new_rf); total_abs_rfs_diff >= 2) { + throw exceptions::invalid_request_exception("Only one DC's RF can be changed at a time and not by more than 1"); } } } @@ -118,6 +130,63 @@ bool cql3::statements::alter_keyspace_statement::changes_tablets(query_processor return ks.get_replication_strategy().uses_tablets() && !_attrs->get_replication_options().empty(); } +namespace { +// These functions are used to flatten all the options in the keyspace definition into a single-level map. +// (Currently options are stored in a nested structure that looks more like a map>). +// Flattening is simply joining the keys of maps from both levels with a colon ':' character, +// or in other words: prefixing the keys in the output map with the option type, e.g. 'replication', 'storage', etc., +// so that the output map contains entries like: "replication:dc1" -> "3". +// This is done to avoid key conflicts and to be able to de-flatten the map back into the original structure. + +void add_prefixed_key(const sstring& prefix, const std::map& in, std::map& out) { + for (const auto& [in_key, in_value]: in) { + out[prefix + ":" + in_key] = in_value; + } +}; + +std::map get_current_options_flattened(const shared_ptr& ks, + bool include_tablet_options, + const gms::feature_service& feat) { + std::map all_options; + + add_prefixed_key(ks->KW_REPLICATION, ks->get_replication_options(), all_options); + add_prefixed_key(ks->KW_STORAGE, ks->get_storage_options().to_map(), all_options); + // if no tablet options are specified in ATLER KS statement, + // we want to preserve the old ones and hence cannot overwrite them with defaults + if (include_tablet_options) { + auto initial_tablets = ks->get_initial_tablets(std::nullopt); + add_prefixed_key(ks->KW_TABLETS, + {{"enabled", initial_tablets ? "true" : "false"}, + {"initial", std::to_string(initial_tablets.value_or(0))}}, + all_options); + } + add_prefixed_key(ks->KW_DURABLE_WRITES, + {{sstring(ks->KW_DURABLE_WRITES), to_sstring(ks->get_boolean(ks->KW_DURABLE_WRITES, true))}}, + all_options); + + return all_options; +} + +std::map get_old_options_flattened(const data_dictionary::keyspace& ks, bool include_tablet_options) { + std::map all_options; + + using namespace cql3::statements; + add_prefixed_key(ks_prop_defs::KW_REPLICATION, ks.get_replication_strategy().get_config_options(), all_options); + add_prefixed_key(ks_prop_defs::KW_STORAGE, ks.metadata()->get_storage_options().to_map(), all_options); + if (include_tablet_options) { + add_prefixed_key(ks_prop_defs::KW_TABLETS, + {{"enabled", ks.metadata()->initial_tablets() ? "true" : "false"}, + {"initial", std::to_string(ks.metadata()->initial_tablets().value_or(0))}}, + all_options); + } + add_prefixed_key(ks_prop_defs::KW_DURABLE_WRITES, + {{sstring(ks_prop_defs::KW_DURABLE_WRITES), to_sstring(ks.metadata()->durable_writes())}}, + all_options); + + return all_options; +} +} // namespace + future, cql3::cql_warnings_vec>> cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp, service::query_state& state, const query_options& options, service::group0_batch& mc) const { using namespace cql_transport; @@ -130,11 +199,18 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce auto ks_md_update = _attrs->as_ks_metadata_update(ks_md, tm, feat); std::vector muts; std::vector warnings; - auto ks_options = _attrs->get_all_options_flattened(feat); + bool include_tablet_options = _attrs->get_map(_attrs->KW_TABLETS).has_value(); + auto old_ks_options = get_old_options_flattened(ks, include_tablet_options); + auto ks_options = get_current_options_flattened(_attrs, include_tablet_options, feat); + ks_options.merge(old_ks_options); + auto ts = mc.write_timestamp(); auto global_request_id = mc.new_group0_state_id(); // we only want to run the tablets path if there are actually any tablets changes, not only schema changes + // TODO: the current `if (changes_tablets(qp))` is insufficient: someone may set the same RFs as before, + // and we'll unnecessarily trigger the processing path for ALTER tablets KS, + // when in reality nothing or only schema is being changed if (changes_tablets(qp)) { if (!qp.topology_global_queue_empty()) { return make_exception_future, cql3::cql_warnings_vec>>( diff --git a/cql3/statements/ks_prop_defs.cc b/cql3/statements/ks_prop_defs.cc index 00e0623ad6..518916b69c 100644 --- a/cql3/statements/ks_prop_defs.cc +++ b/cql3/statements/ks_prop_defs.cc @@ -185,22 +185,6 @@ bool ks_prop_defs::get_durable_writes() const { return get_boolean(KW_DURABLE_WRITES, true); } -std::map ks_prop_defs::get_all_options_flattened(const gms::feature_service& feat) const { - std::map all_options; - - auto ingest_flattened_options = [&all_options](const std::map& options, const sstring& prefix) { - for (auto& option: options) { - all_options[prefix + ":" + option.first] = option.second; - } - }; - ingest_flattened_options(get_replication_options(), KW_REPLICATION); - ingest_flattened_options(get_storage_options().to_map(), KW_STORAGE); - ingest_flattened_options(get_map(KW_TABLETS).value_or(std::map{}), KW_TABLETS); - ingest_flattened_options({{sstring(KW_DURABLE_WRITES), to_sstring(get_boolean(KW_DURABLE_WRITES, true))}}, KW_DURABLE_WRITES); - - return all_options; -} - lw_shared_ptr ks_prop_defs::as_ks_metadata(sstring ks_name, const locator::token_metadata& tm, const gms::feature_service& feat) { auto sc = get_replication_strategy_class().value(); // if tablets options have not been specified, but tablets are globally enabled, set the value to 0 for N.T.S. only diff --git a/cql3/statements/ks_prop_defs.hh b/cql3/statements/ks_prop_defs.hh index da0ba82913..207f24b416 100644 --- a/cql3/statements/ks_prop_defs.hh +++ b/cql3/statements/ks_prop_defs.hh @@ -58,7 +58,6 @@ public: std::optional get_initial_tablets(std::optional default_value) const; data_dictionary::storage_options get_storage_options() const; bool get_durable_writes() const; - std::map get_all_options_flattened(const gms::feature_service& feat) const; lw_shared_ptr as_ks_metadata(sstring ks_name, const locator::token_metadata&, const gms::feature_service&); lw_shared_ptr as_ks_metadata_update(lw_shared_ptr old, const locator::token_metadata&, const gms::feature_service&); }; diff --git a/cql3/statements/property_definitions.hh b/cql3/statements/property_definitions.hh index c115af9ebb..8e05682e32 100644 --- a/cql3/statements/property_definitions.hh +++ b/cql3/statements/property_definitions.hh @@ -46,14 +46,14 @@ public: protected: std::optional get_simple(const sstring& name) const; - std::optional> get_map(const sstring& name) const; - void remove_from_map_if_exists(const sstring& name, const sstring& key) const; public: bool has_property(const sstring& name) const; std::optional get(const sstring& name) const; + std::optional> get_map(const sstring& name) const; + sstring get_string(sstring key, sstring default_value) const; // Return a property value, typed as a Boolean diff --git a/test/cql-pytest/test_tablets.py b/test/cql-pytest/test_tablets.py index 64b47aabec..163469b783 100644 --- a/test/cql-pytest/test_tablets.py +++ b/test/cql-pytest/test_tablets.py @@ -299,22 +299,27 @@ def test_lwt_support_with_tablets(cql, test_keyspace, skip_without_tablets): # We want to ensure that we can only change the RF of any DC by at most 1 at a time # if we use tablets. That provides us with the guarantee that the old and the new QUORUM # overlap by at least one node. -def test_alter_tablet_keyspace(cql, this_dc): +def test_alter_tablet_keyspace_rf(cql, this_dc): with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }} " f"AND TABLETS = {{ 'enabled': true, 'initial': 128 }}") as keyspace: def change_opt_rf(rf_opt, new_rf): - cql.execute(f"ALTER KEYSPACE {keyspace} WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{rf_opt}' : {new_rf} }}") + cql.execute(f"ALTER KEYSPACE {keyspace} " + f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{rf_opt}' : {new_rf} }}") + def change_dc_rf(new_rf): change_opt_rf(this_dc, new_rf) - def change_default_rf(new_rf): - change_opt_rf("replication_factor", new_rf) - change_dc_rf(2) - change_dc_rf(3) + change_dc_rf(2) # increase RF by 1 should be OK + change_dc_rf(3) # increase RF by 1 again should be OK + change_dc_rf(3) # setting the same RF shouldn't cause problems + change_dc_rf(4) # increase RF by 1 again should be OK + change_dc_rf(3) # decrease RF by 1 should be OK with pytest.raises(InvalidRequest): - change_dc_rf(5) + change_dc_rf(5) # increase RF by 2 should fail with pytest.raises(InvalidRequest): - change_dc_rf(1) + change_dc_rf(1) # decrease RF by 2 should fail with pytest.raises(InvalidRequest): - change_dc_rf(10) + change_dc_rf(10) # increase RF by 2+ should fail + with pytest.raises(InvalidRequest): + change_dc_rf(0) # decrease RF by 2+ should fail diff --git a/test/topology_custom/test_tablets.py b/test/topology_custom/test_tablets.py index 594769d0aa..594d6f6868 100644 --- a/test/topology_custom/test_tablets.py +++ b/test/topology_custom/test_tablets.py @@ -3,7 +3,7 @@ # # SPDX-License-Identifier: AGPL-3.0-or-later # -from cassandra.protocol import ConfigurationException +from cassandra.protocol import ConfigurationException, InvalidRequest from cassandra.query import SimpleStatement, ConsistencyLevel from test.pylib.manager_client import ManagerClient from test.pylib.rest_client import HTTPError, read_barrier @@ -190,6 +190,54 @@ async def test_tablet_mutation_fragments_unowned_partition(manager: ManagerClien await cql.run_async(f"SELECT partition_region FROM MUTATION_FRAGMENTS(test.test) WHERE pk={k}", host=host[0]) +# ALTER tablets KS cannot change RF of any DC by more than 1 at a time. +# In a multi-dc environment, we can create replicas in a DC that didn't have replicas before, +# but the above requirement should still be honoured, because we'd be changing RF from 0 to N in the new DC. +# Reproduces https://github.com/scylladb/scylladb/issues/20039#issuecomment-2271365060 +# See also cql-pytest/test_tablets.py::test_alter_tablet_keyspace_rf for basic scenarios tested +@pytest.mark.asyncio +async def test_multidc_alter_tablets_rf(request: pytest.FixtureRequest, manager: ManagerClient) -> None: + config = {"endpoint_snitch": "GossipingPropertyFileSnitch", "enable_tablets": "true"} + + logger.info("Creating a new cluster of 2 nodes in 1st DC and 2 nodes in 2nd DC") + # we have to have at least 2 nodes in each DC if we want to try setting RF to 2 in each DC + await manager.servers_add(2, config=config, property_file={'dc': f'dc1', 'rack': 'myrack'}) + await manager.servers_add(2, config=config, property_file={'dc': f'dc2', 'rack': 'myrack'}) + + cql = manager.get_cql() + await cql.run_async("create keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1}") + # need to create a table to not change only the schema, but also tablets replicas + await cql.run_async("create table ks.t (pk int primary key)") + with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"): + # changing RF of dc2 from 0 to 2 should fail + await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc2': 2}") + + # changing RF of dc2 from 0 to 1 should succeed + await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc2': 1}") + # ensure that RFs of both DCs are equal to 1 now, i.e. that omitting dc1 in above command didn't change it + res = await cql.run_async("SELECT * FROM system_schema.keyspaces WHERE keyspace_name = 'ks'") + assert res[0].replication['dc1'] == '1' + assert res[0].replication['dc2'] == '1' + + # incrementing RF of 2 DCs at once should NOT succeed, because it'd leave 2 pending tablets replicas + with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"): + await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}") + # as above, but decrementing + with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"): + await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 0, 'dc2': 0}") + # as above, but decrement 1 RF and increment the other + with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"): + await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 0}") + # as above, but RFs are swapped + with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"): + await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 0, 'dc2': 2}") + + # check that we can remove all replicas from dc2 by changing RF from 1 to 0 + await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc2': 0}") + # check that we can remove all replicas from the cluster, i.e. change RF of dc1 from 1 to 0 as well: + await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 0}") + + # Reproducer for https://github.com/scylladb/scylladb/issues/18110 # Check that an existing cached read, will be cleaned up when the tablet it reads # from is migrated away.