Merge 'cql: improve validating RF's change in ALTER tablets KS' from Piotr Smaron

This patch series fixes a couple of bugs around validating if RF is not changed by too much when performing ALTER tablets KS.
RF cannot change by more than 1 in total, because tablets load balancer cannot handle more work at once.

Fixes: #20039

Should be backported to 6.0 & 6.1 (wherever tablets feature is present), as this bug may break the cluster.

Closes scylladb/scylladb#20208

* github.com:scylladb/scylladb:
  cql: sum of abs RFs diffs cannot exceed 1 in ALTER tablets KS
  cql: join new and old KS options in ALTER tablets KS
  cql: fix validation of ALTERing RFs in tablets KS
  cql: harden `alter_keyspace_statement.cc::validate_rf_difference`
  cql: validate RF change for new DCs in ALTER tablets KS
  cql: extend test_alter_tablet_keyspace_rf
  cql: refactor test_tablets::test_alter_tablet_keyspace
  cql: remove unused helper function from test_tablets
This commit is contained in:
Kamil Braun
2024-10-08 14:33:45 +02:00
6 changed files with 159 additions and 47 deletions

View File

@@ -11,6 +11,7 @@
#include <boost/range/algorithm.hpp>
#include <fmt/format.h>
#include <seastar/core/coroutine.hh>
#include <seastar/core/on_internal_error.hh>
#include <stdexcept>
#include "alter_keyspace_statement.hh"
#include "prepared_statement.hh"
@@ -43,18 +44,16 @@ future<> cql3::statements::alter_keyspace_statement::check_access(query_processo
return state.has_keyspace_access(_name, auth::permission::ALTER);
}
static bool validate_rf_difference(const std::string_view curr_rf, const std::string_view new_rf) {
auto to_number = [] (const std::string_view rf) {
int result;
// We assume the passed string view represents a valid decimal number,
// so we don't need the error code.
(void) std::from_chars(rf.begin(), rf.end(), result);
return result;
};
// We want to ensure that each DC's RF is going to change by at most 1
// because in that case the old and new quorums must overlap.
return std::abs(to_number(curr_rf) - to_number(new_rf)) <= 1;
static unsigned get_abs_rf_diff(const std::string& curr_rf, const std::string& new_rf) {
try {
return std::abs(std::stoi(curr_rf) - std::stoi(new_rf));
} catch (std::invalid_argument const& ex) {
on_internal_error(mylogger, fmt::format("get_abs_rf_diff expects integer arguments, "
"but got curr_rf:{} and new_rf:{}", curr_rf, new_rf));
} catch (std::out_of_range const& ex) {
on_internal_error(mylogger, fmt::format("get_abs_rf_diff expects integer arguments to fit into `int` type, "
"but got curr_rf:{} and new_rf:{}", curr_rf, new_rf));
}
}
void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, const service::client_state& state) const {
@@ -84,11 +83,24 @@ void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, c
auto new_ks = _attrs->as_ks_metadata_update(ks.metadata(), *qp.proxy().get_token_metadata_ptr(), qp.proxy().features());
if (ks.get_replication_strategy().uses_tablets()) {
const std::map<sstring, sstring>& current_rfs = ks.metadata()->strategy_options();
for (const auto& [new_dc, new_rf] : _attrs->get_replication_options()) {
auto it = current_rfs.find(new_dc);
if (it != current_rfs.end() && !validate_rf_difference(it->second, new_rf)) {
throw exceptions::invalid_request_exception("Cannot modify replication factor of any DC by more than 1 at a time.");
const std::map<sstring, sstring>& current_rf_per_dc = ks.metadata()->strategy_options();
auto new_rf_per_dc = _attrs->get_replication_options();
new_rf_per_dc.erase(ks_prop_defs::REPLICATION_STRATEGY_CLASS_KEY);
unsigned total_abs_rfs_diff = 0;
for (const auto& [new_dc, new_rf] : new_rf_per_dc) {
sstring old_rf = "0";
if (auto new_dc_in_current_mapping = current_rf_per_dc.find(new_dc);
new_dc_in_current_mapping != current_rf_per_dc.end()) {
old_rf = new_dc_in_current_mapping->second;
} else if (!qp.proxy().get_token_metadata_ptr()->get_topology().get_datacenters().contains(new_dc)) {
// This means that the DC listed in ALTER doesn't exist. This error will be reported later,
// during validation in abstract_replication_strategy::validate_replication_strategy.
// We can't report this error now, because it'd change the order of errors reported:
// first we need to report non-existing DCs, then if RFs aren't changed by too much.
continue;
}
if (total_abs_rfs_diff += get_abs_rf_diff(old_rf, new_rf); total_abs_rfs_diff >= 2) {
throw exceptions::invalid_request_exception("Only one DC's RF can be changed at a time and not by more than 1");
}
}
}
@@ -118,6 +130,63 @@ bool cql3::statements::alter_keyspace_statement::changes_tablets(query_processor
return ks.get_replication_strategy().uses_tablets() && !_attrs->get_replication_options().empty();
}
namespace {
// These functions are used to flatten all the options in the keyspace definition into a single-level map<string, string>.
// (Currently options are stored in a nested structure that looks more like a map<string, map<string, string>>).
// Flattening is simply joining the keys of maps from both levels with a colon ':' character,
// or in other words: prefixing the keys in the output map with the option type, e.g. 'replication', 'storage', etc.,
// so that the output map contains entries like: "replication:dc1" -> "3".
// This is done to avoid key conflicts and to be able to de-flatten the map back into the original structure.
void add_prefixed_key(const sstring& prefix, const std::map<sstring, sstring>& in, std::map<sstring, sstring>& out) {
for (const auto& [in_key, in_value]: in) {
out[prefix + ":" + in_key] = in_value;
}
};
std::map<sstring, sstring> get_current_options_flattened(const shared_ptr<cql3::statements::ks_prop_defs>& ks,
bool include_tablet_options,
const gms::feature_service& feat) {
std::map<sstring, sstring> all_options;
add_prefixed_key(ks->KW_REPLICATION, ks->get_replication_options(), all_options);
add_prefixed_key(ks->KW_STORAGE, ks->get_storage_options().to_map(), all_options);
// if no tablet options are specified in ATLER KS statement,
// we want to preserve the old ones and hence cannot overwrite them with defaults
if (include_tablet_options) {
auto initial_tablets = ks->get_initial_tablets(std::nullopt);
add_prefixed_key(ks->KW_TABLETS,
{{"enabled", initial_tablets ? "true" : "false"},
{"initial", std::to_string(initial_tablets.value_or(0))}},
all_options);
}
add_prefixed_key(ks->KW_DURABLE_WRITES,
{{sstring(ks->KW_DURABLE_WRITES), to_sstring(ks->get_boolean(ks->KW_DURABLE_WRITES, true))}},
all_options);
return all_options;
}
std::map<sstring, sstring> get_old_options_flattened(const data_dictionary::keyspace& ks, bool include_tablet_options) {
std::map<sstring, sstring> all_options;
using namespace cql3::statements;
add_prefixed_key(ks_prop_defs::KW_REPLICATION, ks.get_replication_strategy().get_config_options(), all_options);
add_prefixed_key(ks_prop_defs::KW_STORAGE, ks.metadata()->get_storage_options().to_map(), all_options);
if (include_tablet_options) {
add_prefixed_key(ks_prop_defs::KW_TABLETS,
{{"enabled", ks.metadata()->initial_tablets() ? "true" : "false"},
{"initial", std::to_string(ks.metadata()->initial_tablets().value_or(0))}},
all_options);
}
add_prefixed_key(ks_prop_defs::KW_DURABLE_WRITES,
{{sstring(ks_prop_defs::KW_DURABLE_WRITES), to_sstring(ks.metadata()->durable_writes())}},
all_options);
return all_options;
}
} // <anonymous> namespace
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>
cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp, service::query_state& state, const query_options& options, service::group0_batch& mc) const {
using namespace cql_transport;
@@ -130,11 +199,18 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
auto ks_md_update = _attrs->as_ks_metadata_update(ks_md, tm, feat);
std::vector<mutation> muts;
std::vector<sstring> warnings;
auto ks_options = _attrs->get_all_options_flattened(feat);
bool include_tablet_options = _attrs->get_map(_attrs->KW_TABLETS).has_value();
auto old_ks_options = get_old_options_flattened(ks, include_tablet_options);
auto ks_options = get_current_options_flattened(_attrs, include_tablet_options, feat);
ks_options.merge(old_ks_options);
auto ts = mc.write_timestamp();
auto global_request_id = mc.new_group0_state_id();
// we only want to run the tablets path if there are actually any tablets changes, not only schema changes
// TODO: the current `if (changes_tablets(qp))` is insufficient: someone may set the same RFs as before,
// and we'll unnecessarily trigger the processing path for ALTER tablets KS,
// when in reality nothing or only schema is being changed
if (changes_tablets(qp)) {
if (!qp.topology_global_queue_empty()) {
return make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(

View File

@@ -185,22 +185,6 @@ bool ks_prop_defs::get_durable_writes() const {
return get_boolean(KW_DURABLE_WRITES, true);
}
std::map<sstring, sstring> ks_prop_defs::get_all_options_flattened(const gms::feature_service& feat) const {
std::map<sstring, sstring> all_options;
auto ingest_flattened_options = [&all_options](const std::map<sstring, sstring>& options, const sstring& prefix) {
for (auto& option: options) {
all_options[prefix + ":" + option.first] = option.second;
}
};
ingest_flattened_options(get_replication_options(), KW_REPLICATION);
ingest_flattened_options(get_storage_options().to_map(), KW_STORAGE);
ingest_flattened_options(get_map(KW_TABLETS).value_or(std::map<sstring, sstring>{}), KW_TABLETS);
ingest_flattened_options({{sstring(KW_DURABLE_WRITES), to_sstring(get_boolean(KW_DURABLE_WRITES, true))}}, KW_DURABLE_WRITES);
return all_options;
}
lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata(sstring ks_name, const locator::token_metadata& tm, const gms::feature_service& feat) {
auto sc = get_replication_strategy_class().value();
// if tablets options have not been specified, but tablets are globally enabled, set the value to 0 for N.T.S. only

View File

@@ -58,7 +58,6 @@ public:
std::optional<unsigned> get_initial_tablets(std::optional<unsigned> default_value) const;
data_dictionary::storage_options get_storage_options() const;
bool get_durable_writes() const;
std::map<sstring, sstring> get_all_options_flattened(const gms::feature_service& feat) const;
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata(sstring ks_name, const locator::token_metadata&, const gms::feature_service&);
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata_update(lw_shared_ptr<data_dictionary::keyspace_metadata> old, const locator::token_metadata&, const gms::feature_service&);
};

View File

@@ -46,14 +46,14 @@ public:
protected:
std::optional<sstring> get_simple(const sstring& name) const;
std::optional<std::map<sstring, sstring>> get_map(const sstring& name) const;
void remove_from_map_if_exists(const sstring& name, const sstring& key) const;
public:
bool has_property(const sstring& name) const;
std::optional<value_type> get(const sstring& name) const;
std::optional<std::map<sstring, sstring>> get_map(const sstring& name) const;
sstring get_string(sstring key, sstring default_value) const;
// Return a property value, typed as a Boolean

View File

@@ -299,22 +299,27 @@ def test_lwt_support_with_tablets(cql, test_keyspace, skip_without_tablets):
# We want to ensure that we can only change the RF of any DC by at most 1 at a time
# if we use tablets. That provides us with the guarantee that the old and the new QUORUM
# overlap by at least one node.
def test_alter_tablet_keyspace(cql, this_dc):
def test_alter_tablet_keyspace_rf(cql, this_dc):
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }} "
f"AND TABLETS = {{ 'enabled': true, 'initial': 128 }}") as keyspace:
def change_opt_rf(rf_opt, new_rf):
cql.execute(f"ALTER KEYSPACE {keyspace} WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{rf_opt}' : {new_rf} }}")
cql.execute(f"ALTER KEYSPACE {keyspace} "
f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{rf_opt}' : {new_rf} }}")
def change_dc_rf(new_rf):
change_opt_rf(this_dc, new_rf)
def change_default_rf(new_rf):
change_opt_rf("replication_factor", new_rf)
change_dc_rf(2)
change_dc_rf(3)
change_dc_rf(2) # increase RF by 1 should be OK
change_dc_rf(3) # increase RF by 1 again should be OK
change_dc_rf(3) # setting the same RF shouldn't cause problems
change_dc_rf(4) # increase RF by 1 again should be OK
change_dc_rf(3) # decrease RF by 1 should be OK
with pytest.raises(InvalidRequest):
change_dc_rf(5)
change_dc_rf(5) # increase RF by 2 should fail
with pytest.raises(InvalidRequest):
change_dc_rf(1)
change_dc_rf(1) # decrease RF by 2 should fail
with pytest.raises(InvalidRequest):
change_dc_rf(10)
change_dc_rf(10) # increase RF by 2+ should fail
with pytest.raises(InvalidRequest):
change_dc_rf(0) # decrease RF by 2+ should fail

View File

@@ -3,7 +3,7 @@
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
from cassandra.protocol import ConfigurationException
from cassandra.protocol import ConfigurationException, InvalidRequest
from cassandra.query import SimpleStatement, ConsistencyLevel
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import HTTPError, read_barrier
@@ -190,6 +190,54 @@ async def test_tablet_mutation_fragments_unowned_partition(manager: ManagerClien
await cql.run_async(f"SELECT partition_region FROM MUTATION_FRAGMENTS(test.test) WHERE pk={k}", host=host[0])
# ALTER tablets KS cannot change RF of any DC by more than 1 at a time.
# In a multi-dc environment, we can create replicas in a DC that didn't have replicas before,
# but the above requirement should still be honoured, because we'd be changing RF from 0 to N in the new DC.
# Reproduces https://github.com/scylladb/scylladb/issues/20039#issuecomment-2271365060
# See also cql-pytest/test_tablets.py::test_alter_tablet_keyspace_rf for basic scenarios tested
@pytest.mark.asyncio
async def test_multidc_alter_tablets_rf(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
config = {"endpoint_snitch": "GossipingPropertyFileSnitch", "enable_tablets": "true"}
logger.info("Creating a new cluster of 2 nodes in 1st DC and 2 nodes in 2nd DC")
# we have to have at least 2 nodes in each DC if we want to try setting RF to 2 in each DC
await manager.servers_add(2, config=config, property_file={'dc': f'dc1', 'rack': 'myrack'})
await manager.servers_add(2, config=config, property_file={'dc': f'dc2', 'rack': 'myrack'})
cql = manager.get_cql()
await cql.run_async("create keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1}")
# need to create a table to not change only the schema, but also tablets replicas
await cql.run_async("create table ks.t (pk int primary key)")
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
# changing RF of dc2 from 0 to 2 should fail
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc2': 2}")
# changing RF of dc2 from 0 to 1 should succeed
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc2': 1}")
# ensure that RFs of both DCs are equal to 1 now, i.e. that omitting dc1 in above command didn't change it
res = await cql.run_async("SELECT * FROM system_schema.keyspaces WHERE keyspace_name = 'ks'")
assert res[0].replication['dc1'] == '1'
assert res[0].replication['dc2'] == '1'
# incrementing RF of 2 DCs at once should NOT succeed, because it'd leave 2 pending tablets replicas
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}")
# as above, but decrementing
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 0, 'dc2': 0}")
# as above, but decrement 1 RF and increment the other
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 0}")
# as above, but RFs are swapped
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 0, 'dc2': 2}")
# check that we can remove all replicas from dc2 by changing RF from 1 to 0
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc2': 0}")
# check that we can remove all replicas from the cluster, i.e. change RF of dc1 from 1 to 0 as well:
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 0}")
# Reproducer for https://github.com/scylladb/scylladb/issues/18110
# Check that an existing cached read, will be cleaned up when the tablet it reads
# from is migrated away.