cql3: reject concurrent alter of the same keyspace
Reject ALTER KEYSPACE request if there is unfinished (queued, pending, or paused) alter request of the same keyspace. This is required as in the following changes, global request queue will contain rf change requests meant to be resumed.
This commit is contained in:
@@ -64,6 +64,10 @@ bool query_processor::topology_global_queue_empty() {
|
||||
return remote().first.get().ss.topology_global_queue_empty();
|
||||
}
|
||||
|
||||
future<bool> query_processor::ongoing_rf_change(const service::group0_guard& guard, sstring ks) {
|
||||
return remote().first.get().ss.ongoing_rf_change(guard, std::move(ks));
|
||||
}
|
||||
|
||||
static service::query_state query_state_for_internal_call() {
|
||||
return {service::client_state::for_internal_calls(), empty_service_permit()};
|
||||
}
|
||||
|
||||
@@ -474,6 +474,7 @@ public:
|
||||
void reset_cache();
|
||||
|
||||
bool topology_global_queue_empty();
|
||||
future<bool> ongoing_rf_change(const service::group0_guard& guard, sstring ks);
|
||||
|
||||
query_options make_internal_options(
|
||||
const statements::prepared_statement::checked_weak_ptr& p,
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "mutation/canonical_mutation.hh"
|
||||
#include "prepared_statement.hh"
|
||||
#include "seastar/coroutine/exception.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "service/topology_mutation.hh"
|
||||
@@ -138,6 +139,7 @@ bool cql3::statements::alter_keyspace_statement::changes_tablets(query_processor
|
||||
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>
|
||||
cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp, service::query_state& state, const query_options& options, service::group0_batch& mc) const {
|
||||
using namespace cql_transport;
|
||||
bool unknown_keyspace = false;
|
||||
try {
|
||||
event::schema_change::target_type target_type = event::schema_change::target_type::KEYSPACE;
|
||||
auto ks = qp.db().find_keyspace(_name);
|
||||
@@ -158,8 +160,12 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
|
||||
// when in reality nothing or only schema is being changed
|
||||
if (changes_tablets(qp)) {
|
||||
if (!qp.proxy().features().topology_global_request_queue && !qp.topology_global_queue_empty()) {
|
||||
return make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(
|
||||
exceptions::invalid_request_exception("Another global topology request is ongoing, please retry."));
|
||||
co_await coroutine::return_exception(
|
||||
exceptions::invalid_request_exception("Another global topology request is ongoing, please retry."));
|
||||
}
|
||||
if (qp.proxy().features().rack_list_rf && co_await qp.ongoing_rf_change(mc.guard(),_name)) {
|
||||
co_await coroutine::return_exception(
|
||||
exceptions::invalid_request_exception(format("Another RF change for this keyspace {} ongoing, please retry.", _name)));
|
||||
}
|
||||
qp.db().real_database().validate_keyspace_update(*ks_md_update);
|
||||
|
||||
@@ -242,10 +248,15 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
|
||||
target_type,
|
||||
keyspace());
|
||||
mc.add_mutations(std::move(muts), "CQL alter keyspace");
|
||||
return make_ready_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(std::make_tuple(std::move(ret), warnings));
|
||||
co_return std::make_tuple(std::move(ret), warnings);
|
||||
} catch (data_dictionary::no_such_keyspace& e) {
|
||||
return make_exception_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(exceptions::invalid_request_exception("Unknown keyspace " + _name));
|
||||
unknown_keyspace = true;
|
||||
}
|
||||
if (unknown_keyspace) {
|
||||
co_await coroutine::return_exception(
|
||||
exceptions::invalid_request_exception("Unknown keyspace " + _name));
|
||||
}
|
||||
std::unreachable();
|
||||
}
|
||||
|
||||
std::unique_ptr<cql3::statements::prepared_statement>
|
||||
|
||||
@@ -254,6 +254,10 @@ public:
|
||||
group0_batch(const group0_batch&) = delete;
|
||||
group0_batch(group0_batch&&) = default;
|
||||
|
||||
const group0_guard& guard() const {
|
||||
return _guard.value();
|
||||
}
|
||||
|
||||
// Gets timestamp which should be used when building mutations.
|
||||
api::timestamp_type write_timestamp() const;
|
||||
utils::UUID new_group0_state_id() const;
|
||||
|
||||
@@ -1375,6 +1375,34 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
future<bool> storage_service::ongoing_rf_change(const group0_guard& guard, sstring ks) const {
|
||||
auto ongoing_ks_rf_change = [&] (utils::UUID request_id) -> future<bool> {
|
||||
auto req_entry = co_await _sys_ks.local().get_topology_request_entry(request_id);
|
||||
co_return std::holds_alternative<global_topology_request>(req_entry.request_type) &&
|
||||
std::get<global_topology_request>(req_entry.request_type) == global_topology_request::keyspace_rf_change &&
|
||||
req_entry.new_keyspace_rf_change_ks_name.has_value() && req_entry.new_keyspace_rf_change_ks_name.value() == ks;
|
||||
};
|
||||
if (_topology_state_machine._topology.global_request_id.has_value()) {
|
||||
auto req_id = _topology_state_machine._topology.global_request_id.value();
|
||||
if (co_await ongoing_ks_rf_change(req_id)) {
|
||||
co_return true;
|
||||
}
|
||||
}
|
||||
for (auto request_id : _topology_state_machine._topology.paused_rf_change_requests) {
|
||||
if (co_await ongoing_ks_rf_change(request_id)) {
|
||||
co_return true;
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
for (auto request_id : _topology_state_machine._topology.global_requests_queue) {
|
||||
if (co_await ongoing_ks_rf_change(request_id)) {
|
||||
co_return true;
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
co_return false;
|
||||
}
|
||||
|
||||
future<> storage_service::raft_initialize_discovery_leader(const join_node_request_params& params) {
|
||||
if (params.replaced_id.has_value()) {
|
||||
throw std::runtime_error(::format("Cannot perform a replace operation because this is the first node in the cluster"));
|
||||
|
||||
@@ -931,6 +931,7 @@ public:
|
||||
bool topology_global_queue_empty() const {
|
||||
return !_topology_state_machine._topology.global_request.has_value();
|
||||
}
|
||||
future<bool> ongoing_rf_change(const group0_guard& guard, sstring ks) const;
|
||||
future<> raft_initialize_discovery_leader(const join_node_request_params& params);
|
||||
future<> initialize_done_topology_upgrade_state();
|
||||
private:
|
||||
|
||||
Reference in New Issue
Block a user