cql3: reject concurrent alter of the same keyspace

Reject ALTER KEYSPACE request if there is unfinished (queued, pending,
or paused) alter request of the same keyspace.

This is required as in the following changes, global request queue
will contain rf change requests meant to be resumed.
This commit is contained in:
Aleksandra Martyniuk
2025-12-08 17:20:10 +01:00
parent b3a0e4c2dc
commit 2e7ba1f8ce
6 changed files with 53 additions and 4 deletions

View File

@@ -64,6 +64,10 @@ bool query_processor::topology_global_queue_empty() {
return remote().first.get().ss.topology_global_queue_empty();
}
future<bool> query_processor::ongoing_rf_change(const service::group0_guard& guard, sstring ks) {
return remote().first.get().ss.ongoing_rf_change(guard, std::move(ks));
}
static service::query_state query_state_for_internal_call() {
return {service::client_state::for_internal_calls(), empty_service_permit()};
}

View File

@@ -474,6 +474,7 @@ public:
void reset_cache();
bool topology_global_queue_empty();
future<bool> ongoing_rf_change(const service::group0_guard& guard, sstring ks);
query_options make_internal_options(
const statements::prepared_statement::checked_weak_ptr& p,

View File

@@ -19,6 +19,7 @@
#include "locator/abstract_replication_strategy.hh"
#include "mutation/canonical_mutation.hh"
#include "prepared_statement.hh"
#include "seastar/coroutine/exception.hh"
#include "service/migration_manager.hh"
#include "service/storage_proxy.hh"
#include "service/topology_mutation.hh"
@@ -138,6 +139,7 @@ bool cql3::statements::alter_keyspace_statement::changes_tablets(query_processor
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>
cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp, service::query_state& state, const query_options& options, service::group0_batch& mc) const {
using namespace cql_transport;
bool unknown_keyspace = false;
try {
event::schema_change::target_type target_type = event::schema_change::target_type::KEYSPACE;
auto ks = qp.db().find_keyspace(_name);
@@ -158,8 +160,12 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
// when in reality nothing or only schema is being changed
if (changes_tablets(qp)) {
if (!qp.proxy().features().topology_global_request_queue && !qp.topology_global_queue_empty()) {
return make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(
exceptions::invalid_request_exception("Another global topology request is ongoing, please retry."));
co_await coroutine::return_exception(
exceptions::invalid_request_exception("Another global topology request is ongoing, please retry."));
}
if (qp.proxy().features().rack_list_rf && co_await qp.ongoing_rf_change(mc.guard(),_name)) {
co_await coroutine::return_exception(
exceptions::invalid_request_exception(format("Another RF change for this keyspace {} ongoing, please retry.", _name)));
}
qp.db().real_database().validate_keyspace_update(*ks_md_update);
@@ -242,10 +248,15 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
target_type,
keyspace());
mc.add_mutations(std::move(muts), "CQL alter keyspace");
return make_ready_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(std::make_tuple(std::move(ret), warnings));
co_return std::make_tuple(std::move(ret), warnings);
} catch (data_dictionary::no_such_keyspace& e) {
return make_exception_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(exceptions::invalid_request_exception("Unknown keyspace " + _name));
unknown_keyspace = true;
}
if (unknown_keyspace) {
co_await coroutine::return_exception(
exceptions::invalid_request_exception("Unknown keyspace " + _name));
}
std::unreachable();
}
std::unique_ptr<cql3::statements::prepared_statement>

View File

@@ -254,6 +254,10 @@ public:
group0_batch(const group0_batch&) = delete;
group0_batch(group0_batch&&) = default;
const group0_guard& guard() const {
return _guard.value();
}
// Gets timestamp which should be used when building mutations.
api::timestamp_type write_timestamp() const;
utils::UUID new_group0_state_id() const;

View File

@@ -1375,6 +1375,34 @@ public:
}
};
future<bool> storage_service::ongoing_rf_change(const group0_guard& guard, sstring ks) const {
auto ongoing_ks_rf_change = [&] (utils::UUID request_id) -> future<bool> {
auto req_entry = co_await _sys_ks.local().get_topology_request_entry(request_id);
co_return std::holds_alternative<global_topology_request>(req_entry.request_type) &&
std::get<global_topology_request>(req_entry.request_type) == global_topology_request::keyspace_rf_change &&
req_entry.new_keyspace_rf_change_ks_name.has_value() && req_entry.new_keyspace_rf_change_ks_name.value() == ks;
};
if (_topology_state_machine._topology.global_request_id.has_value()) {
auto req_id = _topology_state_machine._topology.global_request_id.value();
if (co_await ongoing_ks_rf_change(req_id)) {
co_return true;
}
}
for (auto request_id : _topology_state_machine._topology.paused_rf_change_requests) {
if (co_await ongoing_ks_rf_change(request_id)) {
co_return true;
}
co_await coroutine::maybe_yield();
}
for (auto request_id : _topology_state_machine._topology.global_requests_queue) {
if (co_await ongoing_ks_rf_change(request_id)) {
co_return true;
}
co_await coroutine::maybe_yield();
}
co_return false;
}
future<> storage_service::raft_initialize_discovery_leader(const join_node_request_params& params) {
if (params.replaced_id.has_value()) {
throw std::runtime_error(::format("Cannot perform a replace operation because this is the first node in the cluster"));

View File

@@ -931,6 +931,7 @@ public:
bool topology_global_queue_empty() const {
return !_topology_state_machine._topology.global_request.has_value();
}
future<bool> ongoing_rf_change(const group0_guard& guard, sstring ks) const;
future<> raft_initialize_discovery_leader(const join_node_request_params& params);
future<> initialize_done_topology_upgrade_state();
private: