Compare commits
6 Commits
branch-6.2
...
next-6.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
98a6002a1a | ||
|
|
9f1ed14f9d | ||
|
|
3bd3d720e0 | ||
|
|
3d6d3484b5 | ||
|
|
db1d3cc342 | ||
|
|
66301eb2b6 |
@@ -43,6 +43,10 @@ future<> maintenance_socket_role_manager::stop() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> maintenance_socket_role_manager::ensure_superuser_is_created() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
template<typename T = void>
|
||||
future<T> operation_not_supported_exception(std::string_view operation) {
|
||||
return make_exception_future<T>(
|
||||
|
||||
@@ -39,6 +39,8 @@ public:
|
||||
|
||||
virtual future<> stop() override;
|
||||
|
||||
virtual future<> ensure_superuser_is_created() override;
|
||||
|
||||
virtual future<> create(std::string_view role_name, const role_config&, ::service::group0_batch&) override;
|
||||
|
||||
virtual future<> drop(std::string_view role_name, ::service::group0_batch& mc) override;
|
||||
|
||||
@@ -106,6 +106,13 @@ public:
|
||||
|
||||
virtual future<> stop() = 0;
|
||||
|
||||
///
|
||||
/// Ensure that superuser role exists.
|
||||
///
|
||||
/// \returns a future once it is ensured that the superuser role exists.
|
||||
///
|
||||
virtual future<> ensure_superuser_is_created() = 0;
|
||||
|
||||
///
|
||||
/// \returns an exceptional future with \ref role_already_exists for a role that has previously been created.
|
||||
///
|
||||
|
||||
@@ -257,6 +257,10 @@ future<> service::stop() {
|
||||
});
|
||||
}
|
||||
|
||||
future<> service::ensure_superuser_is_created() {
|
||||
return _role_manager->ensure_superuser_is_created();
|
||||
}
|
||||
|
||||
void service::update_cache_config() {
|
||||
auto db = _qp.db();
|
||||
|
||||
|
||||
@@ -131,6 +131,8 @@ public:
|
||||
|
||||
future<> stop();
|
||||
|
||||
future<> ensure_superuser_is_created();
|
||||
|
||||
void update_cache_config();
|
||||
|
||||
void reset_authorization_cache();
|
||||
|
||||
@@ -241,35 +241,39 @@ future<> standard_role_manager::migrate_legacy_metadata() {
|
||||
}
|
||||
|
||||
future<> standard_role_manager::start() {
|
||||
return once_among_shards([this] {
|
||||
return futurize_invoke([this] () {
|
||||
if (legacy_mode(_qp)) {
|
||||
return create_legacy_metadata_tables_if_missing();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).then([this] {
|
||||
_stopped = auth::do_after_system_ready(_as, [this] {
|
||||
return seastar::async([this] {
|
||||
if (legacy_mode(_qp)) {
|
||||
_migration_manager.wait_for_schema_agreement(_qp.db().real_database(), db::timeout_clock::time_point::max(), &_as).get();
|
||||
return once_among_shards([this] () -> future<> {
|
||||
if (legacy_mode(_qp)) {
|
||||
co_await create_legacy_metadata_tables_if_missing();
|
||||
}
|
||||
|
||||
if (any_nondefault_role_row_satisfies(_qp, &has_can_login).get()) {
|
||||
if (legacy_metadata_exists()) {
|
||||
log.warn("Ignoring legacy user metadata since nondefault roles already exist.");
|
||||
}
|
||||
auto handler = [this] () -> future<> {
|
||||
const bool legacy = legacy_mode(_qp);
|
||||
if (legacy) {
|
||||
if (!_superuser_created_promise.available()) {
|
||||
_superuser_created_promise.set_value();
|
||||
}
|
||||
co_await _migration_manager.wait_for_schema_agreement(_qp.db().real_database(), db::timeout_clock::time_point::max(), &_as);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (legacy_metadata_exists()) {
|
||||
migrate_legacy_metadata().get();
|
||||
return;
|
||||
}
|
||||
if (co_await any_nondefault_role_row_satisfies(_qp, &has_can_login)) {
|
||||
if (legacy_metadata_exists()) {
|
||||
log.warn("Ignoring legacy user metadata since nondefault roles already exist.");
|
||||
}
|
||||
create_default_role_if_missing().get();
|
||||
});
|
||||
});
|
||||
});
|
||||
co_return;
|
||||
}
|
||||
|
||||
if (legacy_metadata_exists()) {
|
||||
co_await migrate_legacy_metadata();
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
co_await create_default_role_if_missing();
|
||||
if (!legacy) {
|
||||
_superuser_created_promise.set_value();
|
||||
}
|
||||
};
|
||||
|
||||
_stopped = auth::do_after_system_ready(_as, handler);
|
||||
co_return;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -278,6 +282,11 @@ future<> standard_role_manager::stop() {
|
||||
return _stopped.handle_exception_type([] (const sleep_aborted&) { }).handle_exception_type([](const abort_requested_exception&) {});;
|
||||
}
|
||||
|
||||
future<> standard_role_manager::ensure_superuser_is_created() {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
return _superuser_created_promise.get_shared_future();
|
||||
}
|
||||
|
||||
future<> standard_role_manager::create_or_replace(std::string_view role_name, const role_config& c, ::service::group0_batch& mc) {
|
||||
const sstring query = seastar::format("INSERT INTO {}.{} ({}, is_superuser, can_login) VALUES (?, ?, ?)",
|
||||
get_auth_ks_name(_qp),
|
||||
|
||||
@@ -37,6 +37,7 @@ class standard_role_manager final : public role_manager {
|
||||
future<> _stopped;
|
||||
abort_source _as;
|
||||
std::string _superuser;
|
||||
shared_promise<> _superuser_created_promise;
|
||||
|
||||
public:
|
||||
standard_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&);
|
||||
@@ -49,6 +50,8 @@ public:
|
||||
|
||||
virtual future<> stop() override;
|
||||
|
||||
virtual future<> ensure_superuser_is_created() override;
|
||||
|
||||
virtual future<> create(std::string_view role_name, const role_config&, ::service::group0_batch&) override;
|
||||
|
||||
virtual future<> drop(std::string_view role_name, ::service::group0_batch& mc) override;
|
||||
|
||||
@@ -1120,7 +1120,10 @@ future<> compaction_manager::drain() {
|
||||
cmlog.info("Asked to drain");
|
||||
if (*_early_abort_subscription) {
|
||||
_state = state::disabled;
|
||||
_compaction_submission_timer.cancel();
|
||||
co_await stop_ongoing_compactions("drain");
|
||||
// Trigger a signal to properly exit from postponed_compactions_reevaluation() fiber
|
||||
reevaluate_postponed_compactions();
|
||||
}
|
||||
cmlog.info("Drained");
|
||||
}
|
||||
|
||||
1
main.cc
1
main.cc
@@ -2133,6 +2133,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
ss.local().drain_on_shutdown().get();
|
||||
});
|
||||
|
||||
auth_service.local().ensure_superuser_is_created().get();
|
||||
ss.local().register_protocol_server(cql_server_ctl, cfg->start_native_transport()).get();
|
||||
api::set_transport_controller(ctx, cql_server_ctl).get();
|
||||
auto stop_transport_controller = defer_verbose_shutdown("transport controller API", [&ctx] {
|
||||
|
||||
@@ -1454,3 +1454,13 @@ SEASTAR_TEST_CASE(mutation_dump_generated_schema_deterministic_id_version) {
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(enable_drained_compaction_manager) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.db().invoke_on_all([] (replica::database& db) -> future<> {
|
||||
auto& cm = db.get_compaction_manager();
|
||||
co_await cm.drain();
|
||||
cm.enable();
|
||||
}).get();
|
||||
});
|
||||
}
|
||||
|
||||
34
test/topology_experimental_raft/test_restart_cluster.py
Normal file
34
test/topology_experimental_raft/test_restart_cluster.py
Normal file
@@ -0,0 +1,34 @@
|
||||
#
|
||||
# Copyright (C) 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
"""
|
||||
Test clusters can restart fine after all nodes are stopped gracefully
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_restart_cluster(manager: ManagerClient) -> None:
|
||||
"""Test that cluster can restart fine after all nodes are stopped gracefully"""
|
||||
servers = await manager.servers_add(3)
|
||||
cql = manager.get_cql()
|
||||
|
||||
logger.info(f"Servers {servers}, gracefully stopping servers {[s.server_id for s in servers]} to check if all will go up")
|
||||
for s in servers:
|
||||
await manager.server_stop_gracefully(s.server_id)
|
||||
|
||||
logger.info(f"Starting servers {[s.server_id for s in servers]}")
|
||||
for s in servers:
|
||||
await manager.server_start(s.server_id)
|
||||
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
Reference in New Issue
Block a user