Merge 'cql: improve validating RF's change in ALTER tablets KS' from Piotr Smaron
This patch series fixes a couple of bugs around validating if RF is not changed by too much when performing ALTER tablets KS. RF cannot change by more than 1 in total, because tablets load balancer cannot handle more work at once. Fixes: #20039 Should be backported to 6.0 & 6.1 (wherever tablets feature is present), as this bug may break the cluster. Closes scylladb/scylladb#20208 * github.com:scylladb/scylladb: cql: sum of abs RFs diffs cannot exceed 1 in ALTER tablets KS cql: join new and old KS options in ALTER tablets KS cql: fix validation of ALTERing RFs in tablets KS cql: harden `alter_keyspace_statement.cc::validate_rf_difference` cql: validate RF change for new DCs in ALTER tablets KS cql: extend test_alter_tablet_keyspace_rf cql: refactor test_tablets::test_alter_tablet_keyspace cql: remove unused helper function from test_tablets
This commit is contained in:
@@ -11,6 +11,7 @@
|
||||
#include <boost/range/algorithm.hpp>
|
||||
#include <fmt/format.h>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
#include <stdexcept>
|
||||
#include "alter_keyspace_statement.hh"
|
||||
#include "prepared_statement.hh"
|
||||
@@ -43,18 +44,16 @@ future<> cql3::statements::alter_keyspace_statement::check_access(query_processo
|
||||
return state.has_keyspace_access(_name, auth::permission::ALTER);
|
||||
}
|
||||
|
||||
static bool validate_rf_difference(const std::string_view curr_rf, const std::string_view new_rf) {
|
||||
auto to_number = [] (const std::string_view rf) {
|
||||
int result;
|
||||
// We assume the passed string view represents a valid decimal number,
|
||||
// so we don't need the error code.
|
||||
(void) std::from_chars(rf.begin(), rf.end(), result);
|
||||
return result;
|
||||
};
|
||||
|
||||
// We want to ensure that each DC's RF is going to change by at most 1
|
||||
// because in that case the old and new quorums must overlap.
|
||||
return std::abs(to_number(curr_rf) - to_number(new_rf)) <= 1;
|
||||
static unsigned get_abs_rf_diff(const std::string& curr_rf, const std::string& new_rf) {
|
||||
try {
|
||||
return std::abs(std::stoi(curr_rf) - std::stoi(new_rf));
|
||||
} catch (std::invalid_argument const& ex) {
|
||||
on_internal_error(mylogger, fmt::format("get_abs_rf_diff expects integer arguments, "
|
||||
"but got curr_rf:{} and new_rf:{}", curr_rf, new_rf));
|
||||
} catch (std::out_of_range const& ex) {
|
||||
on_internal_error(mylogger, fmt::format("get_abs_rf_diff expects integer arguments to fit into `int` type, "
|
||||
"but got curr_rf:{} and new_rf:{}", curr_rf, new_rf));
|
||||
}
|
||||
}
|
||||
|
||||
void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, const service::client_state& state) const {
|
||||
@@ -84,11 +83,24 @@ void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, c
|
||||
auto new_ks = _attrs->as_ks_metadata_update(ks.metadata(), *qp.proxy().get_token_metadata_ptr(), qp.proxy().features());
|
||||
|
||||
if (ks.get_replication_strategy().uses_tablets()) {
|
||||
const std::map<sstring, sstring>& current_rfs = ks.metadata()->strategy_options();
|
||||
for (const auto& [new_dc, new_rf] : _attrs->get_replication_options()) {
|
||||
auto it = current_rfs.find(new_dc);
|
||||
if (it != current_rfs.end() && !validate_rf_difference(it->second, new_rf)) {
|
||||
throw exceptions::invalid_request_exception("Cannot modify replication factor of any DC by more than 1 at a time.");
|
||||
const std::map<sstring, sstring>& current_rf_per_dc = ks.metadata()->strategy_options();
|
||||
auto new_rf_per_dc = _attrs->get_replication_options();
|
||||
new_rf_per_dc.erase(ks_prop_defs::REPLICATION_STRATEGY_CLASS_KEY);
|
||||
unsigned total_abs_rfs_diff = 0;
|
||||
for (const auto& [new_dc, new_rf] : new_rf_per_dc) {
|
||||
sstring old_rf = "0";
|
||||
if (auto new_dc_in_current_mapping = current_rf_per_dc.find(new_dc);
|
||||
new_dc_in_current_mapping != current_rf_per_dc.end()) {
|
||||
old_rf = new_dc_in_current_mapping->second;
|
||||
} else if (!qp.proxy().get_token_metadata_ptr()->get_topology().get_datacenters().contains(new_dc)) {
|
||||
// This means that the DC listed in ALTER doesn't exist. This error will be reported later,
|
||||
// during validation in abstract_replication_strategy::validate_replication_strategy.
|
||||
// We can't report this error now, because it'd change the order of errors reported:
|
||||
// first we need to report non-existing DCs, then if RFs aren't changed by too much.
|
||||
continue;
|
||||
}
|
||||
if (total_abs_rfs_diff += get_abs_rf_diff(old_rf, new_rf); total_abs_rfs_diff >= 2) {
|
||||
throw exceptions::invalid_request_exception("Only one DC's RF can be changed at a time and not by more than 1");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -118,6 +130,63 @@ bool cql3::statements::alter_keyspace_statement::changes_tablets(query_processor
|
||||
return ks.get_replication_strategy().uses_tablets() && !_attrs->get_replication_options().empty();
|
||||
}
|
||||
|
||||
namespace {
|
||||
// These functions are used to flatten all the options in the keyspace definition into a single-level map<string, string>.
|
||||
// (Currently options are stored in a nested structure that looks more like a map<string, map<string, string>>).
|
||||
// Flattening is simply joining the keys of maps from both levels with a colon ':' character,
|
||||
// or in other words: prefixing the keys in the output map with the option type, e.g. 'replication', 'storage', etc.,
|
||||
// so that the output map contains entries like: "replication:dc1" -> "3".
|
||||
// This is done to avoid key conflicts and to be able to de-flatten the map back into the original structure.
|
||||
|
||||
void add_prefixed_key(const sstring& prefix, const std::map<sstring, sstring>& in, std::map<sstring, sstring>& out) {
|
||||
for (const auto& [in_key, in_value]: in) {
|
||||
out[prefix + ":" + in_key] = in_value;
|
||||
}
|
||||
};
|
||||
|
||||
std::map<sstring, sstring> get_current_options_flattened(const shared_ptr<cql3::statements::ks_prop_defs>& ks,
|
||||
bool include_tablet_options,
|
||||
const gms::feature_service& feat) {
|
||||
std::map<sstring, sstring> all_options;
|
||||
|
||||
add_prefixed_key(ks->KW_REPLICATION, ks->get_replication_options(), all_options);
|
||||
add_prefixed_key(ks->KW_STORAGE, ks->get_storage_options().to_map(), all_options);
|
||||
// if no tablet options are specified in ATLER KS statement,
|
||||
// we want to preserve the old ones and hence cannot overwrite them with defaults
|
||||
if (include_tablet_options) {
|
||||
auto initial_tablets = ks->get_initial_tablets(std::nullopt);
|
||||
add_prefixed_key(ks->KW_TABLETS,
|
||||
{{"enabled", initial_tablets ? "true" : "false"},
|
||||
{"initial", std::to_string(initial_tablets.value_or(0))}},
|
||||
all_options);
|
||||
}
|
||||
add_prefixed_key(ks->KW_DURABLE_WRITES,
|
||||
{{sstring(ks->KW_DURABLE_WRITES), to_sstring(ks->get_boolean(ks->KW_DURABLE_WRITES, true))}},
|
||||
all_options);
|
||||
|
||||
return all_options;
|
||||
}
|
||||
|
||||
std::map<sstring, sstring> get_old_options_flattened(const data_dictionary::keyspace& ks, bool include_tablet_options) {
|
||||
std::map<sstring, sstring> all_options;
|
||||
|
||||
using namespace cql3::statements;
|
||||
add_prefixed_key(ks_prop_defs::KW_REPLICATION, ks.get_replication_strategy().get_config_options(), all_options);
|
||||
add_prefixed_key(ks_prop_defs::KW_STORAGE, ks.metadata()->get_storage_options().to_map(), all_options);
|
||||
if (include_tablet_options) {
|
||||
add_prefixed_key(ks_prop_defs::KW_TABLETS,
|
||||
{{"enabled", ks.metadata()->initial_tablets() ? "true" : "false"},
|
||||
{"initial", std::to_string(ks.metadata()->initial_tablets().value_or(0))}},
|
||||
all_options);
|
||||
}
|
||||
add_prefixed_key(ks_prop_defs::KW_DURABLE_WRITES,
|
||||
{{sstring(ks_prop_defs::KW_DURABLE_WRITES), to_sstring(ks.metadata()->durable_writes())}},
|
||||
all_options);
|
||||
|
||||
return all_options;
|
||||
}
|
||||
} // <anonymous> namespace
|
||||
|
||||
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>
|
||||
cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp, service::query_state& state, const query_options& options, service::group0_batch& mc) const {
|
||||
using namespace cql_transport;
|
||||
@@ -130,11 +199,18 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
|
||||
auto ks_md_update = _attrs->as_ks_metadata_update(ks_md, tm, feat);
|
||||
std::vector<mutation> muts;
|
||||
std::vector<sstring> warnings;
|
||||
auto ks_options = _attrs->get_all_options_flattened(feat);
|
||||
bool include_tablet_options = _attrs->get_map(_attrs->KW_TABLETS).has_value();
|
||||
auto old_ks_options = get_old_options_flattened(ks, include_tablet_options);
|
||||
auto ks_options = get_current_options_flattened(_attrs, include_tablet_options, feat);
|
||||
ks_options.merge(old_ks_options);
|
||||
|
||||
auto ts = mc.write_timestamp();
|
||||
auto global_request_id = mc.new_group0_state_id();
|
||||
|
||||
// we only want to run the tablets path if there are actually any tablets changes, not only schema changes
|
||||
// TODO: the current `if (changes_tablets(qp))` is insufficient: someone may set the same RFs as before,
|
||||
// and we'll unnecessarily trigger the processing path for ALTER tablets KS,
|
||||
// when in reality nothing or only schema is being changed
|
||||
if (changes_tablets(qp)) {
|
||||
if (!qp.topology_global_queue_empty()) {
|
||||
return make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(
|
||||
|
||||
@@ -185,22 +185,6 @@ bool ks_prop_defs::get_durable_writes() const {
|
||||
return get_boolean(KW_DURABLE_WRITES, true);
|
||||
}
|
||||
|
||||
std::map<sstring, sstring> ks_prop_defs::get_all_options_flattened(const gms::feature_service& feat) const {
|
||||
std::map<sstring, sstring> all_options;
|
||||
|
||||
auto ingest_flattened_options = [&all_options](const std::map<sstring, sstring>& options, const sstring& prefix) {
|
||||
for (auto& option: options) {
|
||||
all_options[prefix + ":" + option.first] = option.second;
|
||||
}
|
||||
};
|
||||
ingest_flattened_options(get_replication_options(), KW_REPLICATION);
|
||||
ingest_flattened_options(get_storage_options().to_map(), KW_STORAGE);
|
||||
ingest_flattened_options(get_map(KW_TABLETS).value_or(std::map<sstring, sstring>{}), KW_TABLETS);
|
||||
ingest_flattened_options({{sstring(KW_DURABLE_WRITES), to_sstring(get_boolean(KW_DURABLE_WRITES, true))}}, KW_DURABLE_WRITES);
|
||||
|
||||
return all_options;
|
||||
}
|
||||
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata(sstring ks_name, const locator::token_metadata& tm, const gms::feature_service& feat) {
|
||||
auto sc = get_replication_strategy_class().value();
|
||||
// if tablets options have not been specified, but tablets are globally enabled, set the value to 0 for N.T.S. only
|
||||
|
||||
@@ -58,7 +58,6 @@ public:
|
||||
std::optional<unsigned> get_initial_tablets(std::optional<unsigned> default_value) const;
|
||||
data_dictionary::storage_options get_storage_options() const;
|
||||
bool get_durable_writes() const;
|
||||
std::map<sstring, sstring> get_all_options_flattened(const gms::feature_service& feat) const;
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata(sstring ks_name, const locator::token_metadata&, const gms::feature_service&);
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata_update(lw_shared_ptr<data_dictionary::keyspace_metadata> old, const locator::token_metadata&, const gms::feature_service&);
|
||||
};
|
||||
|
||||
@@ -46,14 +46,14 @@ public:
|
||||
protected:
|
||||
std::optional<sstring> get_simple(const sstring& name) const;
|
||||
|
||||
std::optional<std::map<sstring, sstring>> get_map(const sstring& name) const;
|
||||
|
||||
void remove_from_map_if_exists(const sstring& name, const sstring& key) const;
|
||||
public:
|
||||
bool has_property(const sstring& name) const;
|
||||
|
||||
std::optional<value_type> get(const sstring& name) const;
|
||||
|
||||
std::optional<std::map<sstring, sstring>> get_map(const sstring& name) const;
|
||||
|
||||
sstring get_string(sstring key, sstring default_value) const;
|
||||
|
||||
// Return a property value, typed as a Boolean
|
||||
|
||||
@@ -299,22 +299,27 @@ def test_lwt_support_with_tablets(cql, test_keyspace, skip_without_tablets):
|
||||
# We want to ensure that we can only change the RF of any DC by at most 1 at a time
|
||||
# if we use tablets. That provides us with the guarantee that the old and the new QUORUM
|
||||
# overlap by at least one node.
|
||||
def test_alter_tablet_keyspace(cql, this_dc):
|
||||
def test_alter_tablet_keyspace_rf(cql, this_dc):
|
||||
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }} "
|
||||
f"AND TABLETS = {{ 'enabled': true, 'initial': 128 }}") as keyspace:
|
||||
def change_opt_rf(rf_opt, new_rf):
|
||||
cql.execute(f"ALTER KEYSPACE {keyspace} WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{rf_opt}' : {new_rf} }}")
|
||||
cql.execute(f"ALTER KEYSPACE {keyspace} "
|
||||
f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{rf_opt}' : {new_rf} }}")
|
||||
|
||||
def change_dc_rf(new_rf):
|
||||
change_opt_rf(this_dc, new_rf)
|
||||
def change_default_rf(new_rf):
|
||||
change_opt_rf("replication_factor", new_rf)
|
||||
|
||||
change_dc_rf(2)
|
||||
change_dc_rf(3)
|
||||
change_dc_rf(2) # increase RF by 1 should be OK
|
||||
change_dc_rf(3) # increase RF by 1 again should be OK
|
||||
change_dc_rf(3) # setting the same RF shouldn't cause problems
|
||||
change_dc_rf(4) # increase RF by 1 again should be OK
|
||||
change_dc_rf(3) # decrease RF by 1 should be OK
|
||||
|
||||
with pytest.raises(InvalidRequest):
|
||||
change_dc_rf(5)
|
||||
change_dc_rf(5) # increase RF by 2 should fail
|
||||
with pytest.raises(InvalidRequest):
|
||||
change_dc_rf(1)
|
||||
change_dc_rf(1) # decrease RF by 2 should fail
|
||||
with pytest.raises(InvalidRequest):
|
||||
change_dc_rf(10)
|
||||
change_dc_rf(10) # increase RF by 2+ should fail
|
||||
with pytest.raises(InvalidRequest):
|
||||
change_dc_rf(0) # decrease RF by 2+ should fail
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
from cassandra.protocol import ConfigurationException
|
||||
from cassandra.protocol import ConfigurationException, InvalidRequest
|
||||
from cassandra.query import SimpleStatement, ConsistencyLevel
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import HTTPError, read_barrier
|
||||
@@ -190,6 +190,54 @@ async def test_tablet_mutation_fragments_unowned_partition(manager: ManagerClien
|
||||
await cql.run_async(f"SELECT partition_region FROM MUTATION_FRAGMENTS(test.test) WHERE pk={k}", host=host[0])
|
||||
|
||||
|
||||
# ALTER tablets KS cannot change RF of any DC by more than 1 at a time.
|
||||
# In a multi-dc environment, we can create replicas in a DC that didn't have replicas before,
|
||||
# but the above requirement should still be honoured, because we'd be changing RF from 0 to N in the new DC.
|
||||
# Reproduces https://github.com/scylladb/scylladb/issues/20039#issuecomment-2271365060
|
||||
# See also cql-pytest/test_tablets.py::test_alter_tablet_keyspace_rf for basic scenarios tested
|
||||
@pytest.mark.asyncio
|
||||
async def test_multidc_alter_tablets_rf(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
||||
config = {"endpoint_snitch": "GossipingPropertyFileSnitch", "enable_tablets": "true"}
|
||||
|
||||
logger.info("Creating a new cluster of 2 nodes in 1st DC and 2 nodes in 2nd DC")
|
||||
# we have to have at least 2 nodes in each DC if we want to try setting RF to 2 in each DC
|
||||
await manager.servers_add(2, config=config, property_file={'dc': f'dc1', 'rack': 'myrack'})
|
||||
await manager.servers_add(2, config=config, property_file={'dc': f'dc2', 'rack': 'myrack'})
|
||||
|
||||
cql = manager.get_cql()
|
||||
await cql.run_async("create keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1}")
|
||||
# need to create a table to not change only the schema, but also tablets replicas
|
||||
await cql.run_async("create table ks.t (pk int primary key)")
|
||||
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
|
||||
# changing RF of dc2 from 0 to 2 should fail
|
||||
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc2': 2}")
|
||||
|
||||
# changing RF of dc2 from 0 to 1 should succeed
|
||||
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc2': 1}")
|
||||
# ensure that RFs of both DCs are equal to 1 now, i.e. that omitting dc1 in above command didn't change it
|
||||
res = await cql.run_async("SELECT * FROM system_schema.keyspaces WHERE keyspace_name = 'ks'")
|
||||
assert res[0].replication['dc1'] == '1'
|
||||
assert res[0].replication['dc2'] == '1'
|
||||
|
||||
# incrementing RF of 2 DCs at once should NOT succeed, because it'd leave 2 pending tablets replicas
|
||||
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
|
||||
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}")
|
||||
# as above, but decrementing
|
||||
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
|
||||
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 0, 'dc2': 0}")
|
||||
# as above, but decrement 1 RF and increment the other
|
||||
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
|
||||
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 0}")
|
||||
# as above, but RFs are swapped
|
||||
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
|
||||
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 0, 'dc2': 2}")
|
||||
|
||||
# check that we can remove all replicas from dc2 by changing RF from 1 to 0
|
||||
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc2': 0}")
|
||||
# check that we can remove all replicas from the cluster, i.e. change RF of dc1 from 1 to 0 as well:
|
||||
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 0}")
|
||||
|
||||
|
||||
# Reproducer for https://github.com/scylladb/scylladb/issues/18110
|
||||
# Check that an existing cached read, will be cleaned up when the tablet it reads
|
||||
# from is migrated away.
|
||||
|
||||
Reference in New Issue
Block a user