Compare commits

..

2 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
93fbc0a683 docs: fix typo in materialized views docs - "columns are" instead of "is"
Agent-Logs-Url: https://github.com/scylladb/scylladb/sessions/bcc29e46-1902-4ac6-9a16-4b7e3d03421a

Co-authored-by: annastuchlik <37244380+annastuchlik@users.noreply.github.com>
2026-04-27 14:19:39 +00:00
copilot-swe-agent[bot]
520466b407 Initial plan 2026-04-27 14:18:58 +00:00
24 changed files with 144 additions and 536 deletions

View File

@@ -194,36 +194,22 @@ future<> audit::start_audit(const db::config& cfg, sharded<locator::shared_token
std::move(audited_keyspaces), std::move(audited_keyspaces),
std::move(audited_tables), std::move(audited_tables),
std::move(audited_categories), std::move(audited_categories),
std::cref(cfg)); std::cref(cfg))
} .then([&cfg] {
if (!audit_instance().local_is_initialized()) {
future<> audit::start_storage(const db::config& cfg) { return make_ready_future<>();
if (!audit_instance().local_is_initialized()) { }
return make_ready_future<>(); return audit_instance().invoke_on_all([&cfg] (audit& local_audit) {
} return local_audit.start(cfg);
return audit_instance().invoke_on_all([&cfg] (audit& local_audit) {
return local_audit._storage_helper_ptr->start(cfg).then([&local_audit] {
local_audit._storage_running = true;
}); });
}); });
} }
future<> audit::stop_storage() {
if (!audit_instance().local_is_initialized()) {
return make_ready_future<>();
}
return audit_instance().invoke_on_all([] (audit& local_audit) {
local_audit._storage_running = false;
return local_audit._storage_helper_ptr->stop();
});
}
future<> audit::stop_audit() { future<> audit::stop_audit() {
if (!audit_instance().local_is_initialized()) { if (!audit_instance().local_is_initialized()) {
return make_ready_future<>(); return make_ready_future<>();
} }
return audit::audit::audit_instance().invoke_on_all([] (auto& local_audit) { return audit::audit::audit_instance().invoke_on_all([] (auto& local_audit) {
SCYLLA_ASSERT(!local_audit._storage_running);
return local_audit.shutdown(); return local_audit.shutdown();
}).then([] { }).then([] {
return audit::audit::audit_instance().stop(); return audit::audit::audit_instance().stop();
@@ -237,6 +223,14 @@ audit_info_ptr audit::create_audit_info(statement_category cat, const sstring& k
return std::make_unique<audit_info>(cat, keyspace, table, batch); return std::make_unique<audit_info>(cat, keyspace, table, batch);
} }
future<> audit::start(const db::config& cfg) {
return _storage_helper_ptr->start(cfg);
}
future<> audit::stop() {
return _storage_helper_ptr->stop();
}
future<> audit::shutdown() { future<> audit::shutdown() {
return make_ready_future<>(); return make_ready_future<>();
} }
@@ -247,12 +241,6 @@ future<> audit::log(const audit_info& audit_info, const service::client_state& c
const sstring& username = client_state.user() ? client_state.user()->name.value_or(anonymous_username) : no_username; const sstring& username = client_state.user() ? client_state.user()->name.value_or(anonymous_username) : no_username;
socket_address client_ip = client_state.get_client_address().addr(); socket_address client_ip = client_state.get_client_address().addr();
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr(); socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
if (!_storage_running) {
on_internal_error_noexcept(logger, fmt::format("Audit log dropped (storage not ready): node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}",
node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(),
audit_info.query(), client_ip, audit_info.table(), username));
return make_ready_future<>();
}
if (logger.is_enabled(logging::log_level::debug)) { if (logger.is_enabled(logging::log_level::debug)) {
logger.debug("Log written: node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}", logger.debug("Log written: node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}",
node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(), node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(),
@@ -298,11 +286,6 @@ future<> inspect(const audit_info_alternator& ai, const service::client_state& c
future<> audit::log_login(const sstring& username, socket_address client_ip, bool error) noexcept { future<> audit::log_login(const sstring& username, socket_address client_ip, bool error) noexcept {
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr(); socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
if (!_storage_running) {
on_internal_error_noexcept(logger, fmt::format("Audit login log dropped (storage not ready): node_ip {} client_ip {} username {} error {}",
node_ip, client_ip, username, error ? "true" : "false"));
return make_ready_future<>();
}
if (logger.is_enabled(logging::log_level::debug)) { if (logger.is_enabled(logging::log_level::debug)) {
logger.debug("Login log written: node_ip {}, client_ip {}, username {}, error {}", logger.debug("Login log written: node_ip {}, client_ip {}, username {}, error {}",
node_ip, client_ip, username, error ? "true" : "false"); node_ip, client_ip, username, error ? "true" : "false");

View File

@@ -141,7 +141,6 @@ private:
category_set _audited_categories; category_set _audited_categories;
std::unique_ptr<storage_helper> _storage_helper_ptr; std::unique_ptr<storage_helper> _storage_helper_ptr;
bool _storage_running = false;
const db::config& _cfg; const db::config& _cfg;
utils::observer<sstring> _cfg_keyspaces_observer; utils::observer<sstring> _cfg_keyspaces_observer;
@@ -164,8 +163,6 @@ public:
return audit_instance().local(); return audit_instance().local();
} }
static future<> start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm); static future<> start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm);
static future<> start_storage(const db::config& cfg);
static future<> stop_storage();
static future<> stop_audit(); static future<> stop_audit();
static audit_info_ptr create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table, bool batch = false); static audit_info_ptr create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table, bool batch = false);
audit(locator::shared_token_metadata& stm, audit(locator::shared_token_metadata& stm,
@@ -177,6 +174,8 @@ public:
category_set&& audited_categories, category_set&& audited_categories,
const db::config& cfg); const db::config& cfg);
~audit(); ~audit();
future<> start(const db::config& cfg);
future<> stop();
future<> shutdown(); future<> shutdown();
bool should_log(const audit_info& audit_info) const; bool should_log(const audit_info& audit_info) const;
bool will_log(statement_category cat, std::string_view keyspace = {}, std::string_view table = {}) const; bool will_log(statement_category cat, std::string_view keyspace = {}, std::string_view table = {}) const;

View File

@@ -258,11 +258,13 @@ future<> ldap_role_manager::start() {
} catch (const seastar::sleep_aborted&) { } catch (const seastar::sleep_aborted&) {
co_return; // ignore co_return; // ignore
} }
try { co_await _cache.container().invoke_on_all([] (cache& c) -> future<> {
co_await _cache.reload_all_permissions(); try {
} catch (...) { co_await c.reload_all_permissions();
mylog.warn("Cache reload all permissions failed: {}", std::current_exception()); } catch (...) {
} mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
}
});
} }
}); });
return _std_mgr.start(); return _std_mgr.start();

View File

@@ -157,20 +157,6 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
return create_legacy_keyspace_if_missing(mm); return create_legacy_keyspace_if_missing(mm);
}); });
} }
// Authorizer must be started before the permission loader is set,
// because the loader calls _authorizer->authorize().
// The loader must be set before starting the role manager, because
// LDAP role manager starts a pruner fiber that calls
// reload_all_permissions() which asserts _permission_loader is set.
co_await _authorizer->start();
if (!_used_by_maintenance_socket) {
// Maintenance socket mode can't cache permissions because it has
// different authorizer. We can't mix cached permissions, they could be
// different in normal mode.
_cache.set_permission_loader(std::bind(
&service::get_uncached_permissions,
this, std::placeholders::_1, std::placeholders::_2));
}
co_await _role_manager->start(); co_await _role_manager->start();
if (this_shard_id() == 0) { if (this_shard_id() == 0) {
// Role manager and password authenticator have this odd startup // Role manager and password authenticator have this odd startup
@@ -179,19 +165,21 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
// creation therefore we need to wait here. // creation therefore we need to wait here.
co_await _role_manager->ensure_superuser_is_created(); co_await _role_manager->ensure_superuser_is_created();
} }
// Authenticator must be started after ensure_superuser_is_created() co_await when_all_succeed(_authorizer->start(), _authenticator->start()).discard_result();
// because password_authenticator queries system.roles for the if (!_used_by_maintenance_socket) {
// superuser entry created by the role manager. // Maintenance socket mode can't cache permissions because it has
co_await _authenticator->start(); // different authorizer. We can't mix cached permissions, they could be
// different in normal mode.
_cache.set_permission_loader(std::bind(
&service::get_uncached_permissions,
this, std::placeholders::_1, std::placeholders::_2));
}
} }
future<> service::stop() { future<> service::stop() {
_as.request_abort(); _as.request_abort();
// Reverse of start() order.
co_await _authenticator->stop();
co_await _role_manager->stop();
_cache.set_permission_loader(nullptr); _cache.set_permission_loader(nullptr);
co_await _authorizer->stop(); return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop()).discard_result();
} }
future<> service::ensure_superuser_is_created() { future<> service::ensure_superuser_is_created() {

View File

@@ -71,7 +71,7 @@ used. If it is used, the statement will be a no-op if the materialized view alre
MV Select Statement MV Select Statement
................... ...................
The select statement of a materialized view creation defines which of the base table is included in the view. That The select statement of a materialized view creation defines which of the base table columns are included in the view. That
statement is limited in a number of ways: statement is limited in a number of ways:
- The :ref:`selection <selection-clause>` is limited to those that only select columns of the base table. In other - The :ref:`selection <selection-clause>` is limited to those that only select columns of the base table. In other

46
main.cc
View File

@@ -1810,18 +1810,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
utils::get_local_injector().inject("stop_after_starting_migration_manager", utils::get_local_injector().inject("stop_after_starting_migration_manager",
[] { std::raise(SIGSTOP); }); [] { std::raise(SIGSTOP); });
// Audit must be constructed before the maintenance socket so
// that on shutdown (reverse destruction order) the audit service
// outlives the maintenance socket and in-flight queries can
// still reach audit::inspect() safely.
checkpoint(stop_signal, "starting audit service");
audit::audit::start_audit(*cfg, token_metadata, qp, mm).handle_exception([&] (auto&& e) {
startlog.error("audit start failed: {}", e);
}).get();
auto audit_stop = defer([] {
audit::audit::stop_audit().get();
});
// XXX: stop_raft has to happen before query_processor and migration_manager // XXX: stop_raft has to happen before query_processor and migration_manager
// is stopped, since some groups keep using the query // is stopped, since some groups keep using the query
// processor until are stopped inside stop_raft. // processor until are stopped inside stop_raft.
@@ -2352,22 +2340,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
}).get(); }).get();
stop_signal.ready(false); stop_signal.ready(false);
// At this point, `locator::topology` should be stable, i.e. we should have complete information
// about the layout of the cluster (= list of nodes along with the racks/DCs).
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
db.local().check_rf_rack_validity(token_metadata.local().get());
startlog.info("Verifying that all of the tablet keyspaces use rack list replication factors");
db.local().check_rack_list_everywhere(cfg->enforce_rack_list());
// The table-based audit backend needs Raft (via join_cluster)
// to create its keyspace and table.
checkpoint(stop_signal, "starting audit storage");
audit::audit::start_storage(*cfg).get();
auto audit_storage_stop = defer([] {
audit::audit::stop_storage().get();
});
if (cfg->maintenance_socket() != "ignore") { if (cfg->maintenance_socket() != "ignore") {
// Enable role operations now that node joined the cluster // Enable role operations now that node joined the cluster
maintenance_auth_service.invoke_on_all([](auth::service& svc) { maintenance_auth_service.invoke_on_all([](auth::service& svc) {
@@ -2377,6 +2349,24 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
start_cql(*cql_maintenance_server_ctl, stop_maintenance_cql, "maintenance native server"); start_cql(*cql_maintenance_server_ctl, stop_maintenance_cql, "maintenance native server");
} }
// At this point, `locator::topology` should be stable, i.e. we should have complete information
// about the layout of the cluster (= list of nodes along with the racks/DCs).
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
db.local().check_rf_rack_validity(token_metadata.local().get());
startlog.info("Verifying that all of the tablet keyspaces use rack list replication factors");
db.local().check_rack_list_everywhere(cfg->enforce_rack_list());
// Start audit service after join_cluster so that the table-based audit backend
// can properly create its keyspace and table.
checkpoint(stop_signal, "starting audit service");
audit::audit::start_audit(*cfg, token_metadata, qp, mm).handle_exception([&] (auto&& e) {
startlog.error("audit start failed: {}", e);
}).get();
auto audit_stop = defer([] {
audit::audit::stop_audit().get();
});
// Semantic validation of sstable compression parameters from config. // Semantic validation of sstable compression parameters from config.
// Adding here (i.e., after `join_cluster`) to ensure that the // Adding here (i.e., after `join_cluster`) to ensure that the
// required SSTABLE_COMPRESSION_DICTS cluster feature has been negotiated. // required SSTABLE_COMPRESSION_DICTS cluster feature has been negotiated.

View File

@@ -5466,9 +5466,10 @@ class scylla_compaction_tasks(gdb.Command):
try: try:
task_list = list(intrusive_list(cm['_tasks'])) task_list = list(intrusive_list(cm['_tasks']))
except gdb.error: # 6.2 compatibility except gdb.error: # 6.2 compatibility
task_list = [seastar_shared_ptr(t).get().dereference() for t in std_list(cm['_tasks'])] task_list = list(std_list(cm['_tasks']))
for task in task_list: for task in task_list:
task = seastar_shared_ptr(task).get().dereference()
schema = schema_ptr(task['_compacting_table'].dereference()['_schema']) schema = schema_ptr(task['_compacting_table'].dereference()['_schema'])
key = 'type={}, state={:5}, {}'.format(task['_type'], str(task['_state']), schema.table_name()) key = 'type={}, state={:5}, {}'.format(task['_type'], str(task['_state']), schema.table_name())
task_hist.add(key) task_hist.add(key)

View File

@@ -438,10 +438,9 @@ static future<cql3::untyped_result_set> do_execute_cql_with_timeout(sstring req,
const auto cache_key = qp.compute_id(req, "", cql3::internal_dialect()); const auto cache_key = qp.compute_id(req, "", cql3::internal_dialect());
auto ps_ptr = qp.get_prepared(cache_key); auto ps_ptr = qp.get_prepared(cache_key);
shared_ptr<cql_transport::messages::result_message::prepared> prepared_msg;
if (!ps_ptr) { if (!ps_ptr) {
prepared_msg = co_await qp.prepare(req, qs, cql3::internal_dialect()); const auto msg_ptr = co_await qp.prepare(req, qs, cql3::internal_dialect());
ps_ptr = prepared_msg->get_prepared(); ps_ptr = msg_ptr->get_prepared();
if (!ps_ptr) { if (!ps_ptr) {
on_internal_error(paxos_state::logger, "prepared statement is null"); on_internal_error(paxos_state::logger, "prepared statement is null");
} }
@@ -450,8 +449,8 @@ static future<cql3::untyped_result_set> do_execute_cql_with_timeout(sstring req,
-1, service::node_local_only::yes); -1, service::node_local_only::yes);
const auto st = ps_ptr->statement; const auto st = ps_ptr->statement;
const auto result_ptr = co_await st->execute(qp, qs, qo, std::nullopt); const auto msg_ptr = co_await st->execute(qp, qs, qo, std::nullopt);
co_return cql3::untyped_result_set(result_ptr); co_return cql3::untyped_result_set(msg_ptr);
} }
template <typename... Args> template <typename... Args>

View File

@@ -4237,7 +4237,6 @@ public:
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker) , _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
, _async_gate("topology_coordinator") , _async_gate("topology_coordinator")
{ {
_lifecycle_notifier.register_subscriber(this);
_db.get_notifier().register_listener(this); _db.get_notifier().register_listener(this);
// When the delay_cdc_stream_finalization error injection is disabled // When the delay_cdc_stream_finalization error injection is disabled
// (test releases it), wake the topology coordinator so it retries // (test releases it), wake the topology coordinator so it retries
@@ -4401,7 +4400,6 @@ future<bool> topology_coordinator::maybe_retry_failed_rf_change_tablet_rebuilds(
} }
future<> topology_coordinator::refresh_tablet_load_stats() { future<> topology_coordinator::refresh_tablet_load_stats() {
co_await utils::get_local_injector().inject("refresh_tablet_load_stats_pause", utils::wait_for_message(5min));
auto tm = get_token_metadata_ptr(); auto tm = get_token_metadata_ptr();
locator::load_stats stats; locator::load_stats stats;
@@ -4725,6 +4723,7 @@ future<> topology_coordinator::run() {
co_await _async_gate.close(); co_await _async_gate.close();
co_await std::move(tablet_load_stats_refresher); co_await std::move(tablet_load_stats_refresher);
co_await _tablet_load_stats_refresh.join();
co_await std::move(cdc_generation_publisher); co_await std::move(cdc_generation_publisher);
co_await std::move(cdc_streams_gc); co_await std::move(cdc_streams_gc);
co_await std::move(gossiper_orphan_remover); co_await std::move(gossiper_orphan_remover);
@@ -4737,8 +4736,6 @@ future<> topology_coordinator::stop() {
co_await _db.get_notifier().unregister_listener(this); co_await _db.get_notifier().unregister_listener(this);
utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization"); utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization");
_topo_sm.on_tablet_split_ready = nullptr; _topo_sm.on_tablet_split_ready = nullptr;
co_await _lifecycle_notifier.unregister_subscriber(this);
co_await _tablet_load_stats_refresh.join();
// if topology_coordinator::run() is aborted either because we are not a // if topology_coordinator::run() is aborted either because we are not a
// leader anymore, or we are shutting down as a leader, we have to handle // leader anymore, or we are shutting down as a leader, we have to handle
@@ -4800,6 +4797,7 @@ future<> run_topology_coordinator(
topology_cmd_rpc_tracker}; topology_cmd_rpc_tracker};
std::exception_ptr ex; std::exception_ptr ex;
lifecycle_notifier.register_subscriber(&coordinator);
try { try {
rtlogger.info("start topology coordinator fiber"); rtlogger.info("start topology coordinator fiber");
co_await with_scheduling_group(group0.get_scheduling_group(), [&] { co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
@@ -4820,7 +4818,7 @@ future<> run_topology_coordinator(
} }
on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex)); on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex));
} }
co_await utils::get_local_injector().inject("topology_coordinator_pause_before_stop", utils::wait_for_message(5min)); co_await lifecycle_notifier.unregister_subscriber(&coordinator);
co_await coordinator.stop(); co_await coordinator.stop();
} }

View File

@@ -543,16 +543,11 @@ future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept {
// during SSTable writing and removed before sealing. If the write // during SSTable writing and removed before sealing. If the write
// failed before sealing, the file may still be on disk and must be // failed before sealing, the file may still be on disk and must be
// cleaned up explicitly. // cleaned up explicitly.
// The component is only defined for the `ms` sstable format; for
// older formats it is absent from the component map and looking up
// its filename would throw std::out_of_range.
// Use file_exists() to avoid a C++ exception on the common path // Use file_exists() to avoid a C++ exception on the common path
// where the file was already removed before sealing. // where the file was already removed before sealing.
if (sstable_version_constants::get_component_map(sst.get_version()).contains(component_type::TemporaryHashes)) { auto temp_hashes = filename(sst, dir_name.native(), sst._generation, component_type::TemporaryHashes);
auto temp_hashes = filename(sst, dir_name.native(), sst._generation, component_type::TemporaryHashes); if (co_await file_exists(temp_hashes)) {
if (co_await file_exists(temp_hashes)) { co_await sst.sstable_write_io_check(remove_file, std::move(temp_hashes));
co_await sst.sstable_write_io_check(remove_file, std::move(temp_hashes));
}
} }
if (sync) { if (sync) {
co_await sst.sstable_write_io_check(sync_directory, dir_name.native()); co_await sst.sstable_write_io_check(sync_directory, dir_name.native());

View File

@@ -135,23 +135,7 @@ future<> table_helper::cache_table_info(cql3::query_processor& qp, service::migr
} }
future<> table_helper::insert(cql3::query_processor& qp, service::migration_manager& mm, service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker) { future<> table_helper::insert(cql3::query_processor& qp, service::migration_manager& mm, service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker) {
// _prepared_stmt is a checked_weak_ptr into the prepared statements co_await cache_table_info(qp, mm, qs);
// cache and can be invalidated by a concurrent purge (e.g. on a schema
// change). cache_table_info() (re-)prepares and assigns _prepared_stmt,
// but the pin protecting the entry is dropped when try_prepare()
// returns. In release the chain of ready-future co_awaits back to here
// resumes synchronously, but debug builds preempt on every co_await
// even for ready futures, opening a window for a purge to drop the
// entry and leave _prepared_stmt null. Loop until a synchronous
// post-resume check finds _prepared_stmt valid; nothing can run between
// that check and the dereference below. _insert_stmt is a strong
// shared_ptr and is not affected by cache invalidation.
while (true) {
co_await cache_table_info(qp, mm, qs);
if (_prepared_stmt) {
break;
}
}
auto opts = opt_maker(); auto opts = opt_maker();
opts.prepare(_prepared_stmt->bound_names); opts.prepare(_prepared_stmt->bound_names);
co_await _insert_stmt->execute(qp, qs, opts, std::nullopt); co_await _insert_stmt->execute(qp, qs, opts, std::nullopt);

View File

@@ -246,33 +246,6 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_extra_temporary_toc) {
}); });
} }
// Reproducer for SCYLLADB-1697
SEASTAR_TEST_CASE(sstable_directory_test_unlink_sstable_leaves_no_orphans) {
return sstables::test_env::do_with_async([] (test_env& env) {
for (const auto version : {sstable_version_types::me, sstable_version_types::ms}) {
testlog.info("Testing sstable version: {}", version);
auto sst = make_sstable_for_this_shard([&env, version] {
return env.make_sstable(test_table_schema(), version);
});
// Sanity: the TOC was written, otherwise the assertion below would be vacuous.
BOOST_REQUIRE(file_exists(test(sst).filename(sstables::component_type::TOC).native()).get());
sst->unlink().get();
std::vector<sstring> remaining;
lister::scan_dir(env.tempdir().path(), lister::dir_entry_types::of<directory_entry_type::regular>(),
[&remaining] (fs::path, directory_entry de) {
remaining.push_back(de.name);
return make_ready_future<>();
}).get();
BOOST_REQUIRE_MESSAGE(remaining.empty(),
fmt::format("Expected empty sstable dir after unlink for version {}, found: {}", version, remaining));
}
});
}
// Test the absence of TOC. Behavior is controllable by a flag // Test the absence of TOC. Behavior is controllable by a flag
SEASTAR_TEST_CASE(sstable_directory_test_table_missing_toc) { SEASTAR_TEST_CASE(sstable_directory_test_table_missing_toc) {
return sstables::test_env::do_with_async([] (test_env& env) { return sstables::test_env::do_with_async([] (test_env& env) {

View File

@@ -11,11 +11,13 @@ from typing import TYPE_CHECKING
from cassandra.auth import PlainTextAuthProvider from cassandra.auth import PlainTextAuthProvider
from test.pylib.internal_types import ServerInfo
from test.pylib.manager_client import ManagerClient from test.pylib.manager_client import ManagerClient
from test.cluster.dtest.ccmlib.common import logger from test.cluster.dtest.ccmlib.common import logger
from test.cluster.dtest.ccmlib.scylla_node import ScyllaNode from test.cluster.dtest.ccmlib.scylla_node import ScyllaNode
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import Iterable
from typing import Any from typing import Any
@@ -27,10 +29,6 @@ class ScyllaCluster:
self.manager = manager self.manager = manager
self.scylla_mode = scylla_mode self.scylla_mode = scylla_mode
self._config_options = {} self._config_options = {}
# Cached ScyllaNode instances. Nodes are appended by _add_nodes()
# in the order they are created by servers_add().
self._nodes: list[ScyllaNode] = []
self._next_node_num: int = 1
if self.scylla_mode == "debug": if self.scylla_mode == "debug":
self.default_wait_other_notice_timeout = 600 self.default_wait_other_notice_timeout = 600
@@ -41,20 +39,19 @@ class ScyllaCluster:
self.force_wait_for_cluster_start = force_wait_for_cluster_start self.force_wait_for_cluster_start = force_wait_for_cluster_start
def _add_nodes(self, servers: list) -> None: @staticmethod
"""Create ScyllaNode instances for the given servers and cache them.""" def _sorted_nodes(servers: Iterable[ServerInfo]) -> list[ServerInfo]:
for server in servers: return sorted(servers, key=lambda s: s.server_id)
name = f"node{self._next_node_num}"
self._next_node_num += 1
self._nodes.append(ScyllaNode(
cluster=self, server=server, name=name))
@property @property
def nodes(self) -> dict[str, ScyllaNode]: def nodes(self) -> dict[str, ScyllaNode]:
return {node.name: node for node in self.nodelist()} return {node.name: node for node in self.nodelist()}
def nodelist(self) -> list[ScyllaNode]: def nodelist(self) -> list[ScyllaNode]:
return list(self._nodes) return [
ScyllaNode(cluster=self, server=server, name=f"node{n}")
for n, server in enumerate(self._sorted_nodes(self.manager.all_servers()), start=1)
]
def get_node_ip(self, nodeid: int) -> str: def get_node_ip(self, nodeid: int) -> str:
return self.nodelist()[nodeid-1].address() return self.nodelist()[nodeid-1].address()
@@ -64,16 +61,16 @@ class ScyllaCluster:
self.manager.auth_provider = PlainTextAuthProvider(username="cassandra", password="cassandra") self.manager.auth_provider = PlainTextAuthProvider(username="cassandra", password="cassandra")
match nodes: match nodes:
case int(): case int():
self._add_nodes(self.manager.servers_add(servers_num=nodes, config=self._config_options, start=False, auto_rack_dc="dc1")) self.manager.servers_add(servers_num=nodes, config=self._config_options, start=False, auto_rack_dc="dc1")
case list(): case list():
for dc, n_nodes in enumerate(nodes, start=1): for dc, n_nodes in enumerate(nodes, start=1):
dc_name = f"dc{dc}" dc_name = f"dc{dc}"
self._add_nodes(self.manager.servers_add( self.manager.servers_add(
servers_num=n_nodes, servers_num=n_nodes,
config=self._config_options, config=self._config_options,
start=False, start=False,
auto_rack_dc=dc_name auto_rack_dc=dc_name
)) )
case dict(): case dict():
# Supported spec: {"dc1": {"rack1": 3, "rack2": 2}, "dc2": {"rack1": 2}} # Supported spec: {"dc1": {"rack1": 3, "rack2": 2}, "dc2": {"rack1": 2}}
for dc, dc_nodes in nodes.items(): for dc, dc_nodes in nodes.items():
@@ -82,7 +79,7 @@ class ScyllaCluster:
for rack, rack_nodes in dc_nodes.items(): for rack, rack_nodes in dc_nodes.items():
if not isinstance(rack_nodes, int): if not isinstance(rack_nodes, int):
raise RuntimeError(f"Unsupported topology specification: {nodes}") raise RuntimeError(f"Unsupported topology specification: {nodes}")
self._add_nodes(self.manager.servers_add( self.manager.servers_add(
servers_num=rack_nodes, servers_num=rack_nodes,
config=self._config_options, config=self._config_options,
property_file={ property_file={
@@ -90,7 +87,7 @@ class ScyllaCluster:
"rack": rack, "rack": rack,
}, },
start=False, start=False,
)) )
case _: case _:
raise RuntimeError(f"Unsupported topology specification: {nodes}") raise RuntimeError(f"Unsupported topology specification: {nodes}")

View File

@@ -17,7 +17,6 @@ from itertools import chain
from functools import cached_property from functools import cached_property
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
import logging
from test.cluster.dtest.ccmlib.common import ArgumentError, wait_for, BIN_DIR from test.cluster.dtest.ccmlib.common import ArgumentError, wait_for, BIN_DIR
from test.pylib.internal_types import ServerUpState from test.pylib.internal_types import ServerUpState
@@ -29,9 +28,6 @@ if TYPE_CHECKING:
from test.cluster.dtest.ccmlib.scylla_cluster import ScyllaCluster from test.cluster.dtest.ccmlib.scylla_cluster import ScyllaCluster
logger = logging.getLogger("scylla_node")
NODETOOL_STDERR_IGNORED_PATTERNS = ( NODETOOL_STDERR_IGNORED_PATTERNS = (
re.compile(r"WARNING: debug mode. Not for benchmarking or production"), re.compile(r"WARNING: debug mode. Not for benchmarking or production"),
re.compile( re.compile(
@@ -153,20 +149,15 @@ class ScyllaNode:
return self.cluster.scylla_mode return self.cluster.scylla_mode
def set_smp(self, smp: int) -> None: def set_smp(self, smp: int) -> None:
logger.debug(f"Setting smp: {self=} {smp=}")
self._smp_set_during_test = smp self._smp_set_during_test = smp
def smp(self) -> int: def smp(self) -> int:
logger.debug(f"Getting smp: {self=} _smp_set_during_test={self._smp_set_during_test} _smp={self._smp} {DEFAULT_SMP=}")
return self._smp_set_during_test or self._smp or DEFAULT_SMP return self._smp_set_during_test or self._smp or DEFAULT_SMP
def memory(self) -> int: def memory(self) -> int:
return self._memory or self.smp() * DEFAULT_MEMORY_PER_CPU return self._memory or self.smp() * DEFAULT_MEMORY_PER_CPU
def _adjust_smp_and_memory(self, smp: int | None = None, memory: int | None = None) -> None: def _adjust_smp_and_memory(self, smp: int | None = None, memory: int | None = None) -> None:
if not memory and not smp:
return
logger.debug(f"Adjusting smp={smp} memory={memory} current_smp={self._smp} current_memory={self._memory}")
if memory: if memory:
self._memory = memory // (smp or self.smp()) * self.smp() self._memory = memory // (smp or self.smp()) * self.smp()
if smp: if smp:
@@ -455,8 +446,6 @@ class ScyllaNode:
self.mark = self.mark_log() self.mark = self.mark_log()
logger.debug(f"Starting server: server_id={self.server_id} {scylla_args=} {scylla_env=}")
self.cluster.manager.server_start( self.cluster.manager.server_start(
server_id=self.server_id, server_id=self.server_id,
seeds=None if self.bootstrap else [self.address()], seeds=None if self.bootstrap else [self.address()],

View File

@@ -1,46 +0,0 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
#
import logging
import pytest
from dtest_class import Tester
logger = logging.getLogger(__file__)
@pytest.mark.single_node
class TestSetSmp(Tester):
"""Test that node.set_smp() properly persists across restarts."""
def _get_smp_from_log(self, node, from_mark=None):
"""Extract smp value from the node's log by looking at the SHARD_COUNT gossip value."""
matches = node.grep_log(r"SHARD_COUNT : Value\((\d+),\d+\)", from_mark=from_mark)
assert matches, "Could not find SHARD_COUNT in node log"
# Return the last match (most recent start)
return int(matches[-1][1].group(1))
def test_set_smp(self):
"""Verify that set_smp() takes effect on the next start."""
cluster = self.cluster
cluster.populate(1).start(wait_for_binary_proto=True)
node1 = cluster.nodelist()[0]
default_smp = self._get_smp_from_log(node1)
cluster.stop()
# set_smp to a different value and restart without jvm_args
target_smp = 1 if default_smp != 1 else 2
node1.set_smp(target_smp)
mark = node1.mark_log()
cluster.start(wait_for_binary_proto=True)
node1 = cluster.nodelist()[0]
actual_smp = self._get_smp_from_log(node1, from_mark=mark)
assert actual_smp == target_smp, \
f"Expected smp={target_smp} after set_smp({target_smp}), got {actual_smp}"

View File

@@ -29,16 +29,12 @@ import pytest
from cassandra import AlreadyExists, AuthenticationFailed, ConsistencyLevel, InvalidRequest, Unauthorized, Unavailable, WriteFailure from cassandra import AlreadyExists, AuthenticationFailed, ConsistencyLevel, InvalidRequest, Unauthorized, Unavailable, WriteFailure
from cassandra.auth import PlainTextAuthProvider from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import NoHostAvailable, Session, EXEC_PROFILE_DEFAULT from cassandra.cluster import NoHostAvailable, Session, EXEC_PROFILE_DEFAULT
from cassandra.connection import UnixSocketEndPoint
from cassandra.policies import WhiteListRoundRobinPolicy
from cassandra.query import BatchStatement, BatchType, SimpleStatement, named_tuple_factory from cassandra.query import BatchStatement, BatchType, SimpleStatement, named_tuple_factory
from test.cluster.conftest import cluster_con
from test.cluster.dtest.dtest_class import create_ks, wait_for from test.cluster.dtest.dtest_class import create_ks, wait_for
from test.cluster.dtest.tools.assertions import assert_invalid from test.cluster.dtest.tools.assertions import assert_invalid
from test.cluster.dtest.tools.data import rows_to_list, run_in_parallel from test.cluster.dtest.tools.data import rows_to_list, run_in_parallel
from test.pylib.driver_utils import safe_driver_shutdown
from test.pylib.manager_client import ManagerClient from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import read_barrier from test.pylib.rest_client import read_barrier
from test.pylib.skip_types import skip_env from test.pylib.skip_types import skip_env
@@ -277,7 +273,6 @@ class AuditEntry:
statement: str statement: str
table: str table: str
user: str user: str
source: str = "127.0.0.1"
class AuditBackend: class AuditBackend:
@@ -454,13 +449,6 @@ class AuditBackendSyslog(AuditBackend):
entries.append(self.line_to_row(line, idx)) entries.append(self.line_to_row(line, idx))
return { self.audit_mode(): entries } return { self.audit_mode(): entries }
@staticmethod
def _parse_address(addr_port):
"""Extract IP from 'ip:port' (IPv4) or '[ip]:port' (IPv6)."""
if addr_port.startswith("["):
return addr_port[1:addr_port.index("]")]
return addr_port.split(":")[0]
def line_to_row(self, line, idx): def line_to_row(self, line, idx):
metadata, data = line.split(": ", 1) metadata, data = line.split(": ", 1)
data = "".join(data.splitlines()) # Remove newlines data = "".join(data.splitlines()) # Remove newlines
@@ -472,9 +460,9 @@ class AuditBackendSyslog(AuditBackend):
# and make sure it doesn't change during the test (e.g. when the test is running at 23:59:59) # and make sure it doesn't change during the test (e.g. when the test is running at 23:59:59)
date = datetime.datetime(2000, 1, 1, 0, 0) date = datetime.datetime(2000, 1, 1, 0, 0)
node = self._parse_address(match.group("node")) node = match.group("node").split(":")[0]
statement = match.group("query").replace("\\", "") statement = match.group("query").replace("\\", "")
source = self._parse_address(match.group("client_ip")) source = match.group("client_ip").split(":")[0]
event_time = uuid.UUID(int=idx) event_time = uuid.UUID(int=idx)
t = self.named_tuple_factory(date, node, event_time, match.group("category"), match.group("cl"), match.group("error") == "true", match.group("keyspace"), statement, source, match.group("table"), match.group("username")) t = self.named_tuple_factory(date, node, event_time, match.group("category"), match.group("cl"), match.group("error") == "true", match.group("keyspace"), statement, source, match.group("table"), match.group("username"))
return t return t
@@ -594,7 +582,6 @@ class CQLAuditTester(AuditTester):
user="anonymous", user="anonymous",
cl="ONE", cl="ONE",
error=False, error=False,
source="127.0.0.1",
): ):
self.assert_audit_row_fields(row) self.assert_audit_row_fields(row)
assert row.node in self.server_addresses assert row.node in self.server_addresses
@@ -603,7 +590,7 @@ class CQLAuditTester(AuditTester):
assert row.error == error assert row.error == error
assert row.keyspace_name == ks assert row.keyspace_name == ks
assert row.operation == statement assert row.operation == statement
assert row.source == source assert row.source == "127.0.0.1"
assert row.table_name == table assert row.table_name == table
assert row.username == user assert row.username == user
@@ -827,7 +814,7 @@ class CQLAuditTester(AuditTester):
sorted_new_rows = sorted(new_rows, key=lambda row: (row.node, row.category, row.consistency, row.error, row.keyspace_name, row.operation, row.source, row.table_name, row.username)) sorted_new_rows = sorted(new_rows, key=lambda row: (row.node, row.category, row.consistency, row.error, row.keyspace_name, row.operation, row.source, row.table_name, row.username))
assert len(sorted_new_rows) == len(expected_entries) assert len(sorted_new_rows) == len(expected_entries)
for row, entry in zip(sorted_new_rows, sorted(expected_entries)): for row, entry in zip(sorted_new_rows, sorted(expected_entries)):
self.assert_audit_row_eq(row, entry.category, entry.statement, entry.table, entry.ks, entry.user, entry.cl, entry.error, entry.source) self.assert_audit_row_eq(row, entry.category, entry.statement, entry.table, entry.ks, entry.user, entry.cl, entry.error)
async def verify_keyspace(self, audit_settings=None, helper=None): async def verify_keyspace(self, audit_settings=None, helper=None):
""" """
@@ -1867,44 +1854,6 @@ class CQLAuditTester(AuditTester):
finally: finally:
session.execute("DROP KEYSPACE IF EXISTS kss") session.execute("DROP KEYSPACE IF EXISTS kss")
# Unix domain sockets have no IP peer address. Seastar's
# socket_address::addr() falls through to the default case for
# AF_UNIX and returns a zero-initialised in6_addr, i.e. "::".
MAINTENANCE_SOCKET_SOURCE = "::"
async def _test_audit_maintenance_socket_user_creation(self, manager, helper_class):
with helper_class() as helper:
session = await self.prepare(
user="cassandra", password="cassandra",
helper=helper,
audit_settings={**helper.audit_default_settings, "audit_categories": "DCL", "audit_keyspaces": ""},
create_keyspace=False,
)
servers = await manager.running_servers()
server = servers[0]
socket_path = await manager.server_get_maintenance_socket_path(server.server_id)
logger.info("Connecting to maintenance socket")
endpoint = UnixSocketEndPoint(socket_path)
maint_cluster = cluster_con([endpoint],
load_balancing_policy=WhiteListRoundRobinPolicy([endpoint]))
maint_session = maint_cluster.connect()
role_name = "audit_test_admin"
create_stmt = f"CREATE ROLE {role_name} WITH PASSWORD = 'secret' AND SUPERUSER = true AND LOGIN = true"
expected_operation = f"CREATE ROLE {role_name} WITH PASSWORD = '***' AND SUPERUSER = true AND LOGIN = true"
logger.info("Creating superuser via maintenance socket and verifying audit entry")
expected_entries = [AuditEntry(category="DCL", statement=expected_operation,
user="anonymous", table="", ks="", cl="LOCAL_QUORUM", error=False,
source=self.MAINTENANCE_SOCKET_SOURCE)]
with self.assert_entries_were_added(session, expected_entries):
maint_session.execute(create_stmt)
logger.info("Cleaning up created role")
maint_session.execute(f"DROP ROLE IF EXISTS {role_name}")
safe_driver_shutdown(maint_cluster)
# AuditBackendTable, no auth, rf=1 # AuditBackendTable, no auth, rf=1
@@ -1997,14 +1946,6 @@ async def test_service_level_statements_standalone(manager: ManagerClient):
await CQLAuditTester(manager)._test_service_level_statements() await CQLAuditTester(manager)._test_service_level_statements()
async def test_audit_maintenance_socket_user_creation(manager: ManagerClient):
"""Verify that creating a superuser via the maintenance socket is audited."""
t = CQLAuditTester(manager)
await t._test_audit_maintenance_socket_user_creation(manager, AuditBackendTable)
Syslog = functools.partial(AuditBackendSyslog, socket_path=syslog_socket_path)
await t._test_audit_maintenance_socket_user_creation(manager, Syslog)
# AuditBackendSyslog, no auth, rf=1 # AuditBackendSyslog, no auth, rf=1
async def test_audit_syslog_noauth(manager: ManagerClient): async def test_audit_syslog_noauth(manager: ManagerClient):

View File

@@ -8,7 +8,7 @@ from test.pylib.manager_client import ManagerClient
from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair
from test.pylib.tablets import get_all_tablet_replicas from test.pylib.tablets import get_all_tablet_replicas
from test.cluster.tasks.task_manager_client import TaskManagerClient from test.cluster.tasks.task_manager_client import TaskManagerClient
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, ensure_group0_leader_on, new_test_keyspace, new_test_table, trigger_stepdown from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown
from test.pylib.util import wait_for_cql_and_get_hosts from test.pylib.util import wait_for_cql_and_get_hosts
from cassandra.query import ConsistencyLevel, SimpleStatement from cassandra.query import ConsistencyLevel, SimpleStatement
@@ -880,30 +880,41 @@ async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manage
# affected replica but process the UNREPAIRED sstable on the others, so the classification # affected replica but process the UNREPAIRED sstable on the others, so the classification
# divergence is never corrected. In tombstone scenarios this enables premature tombstone GC # divergence is never corrected. In tombstone scenarios this enables premature tombstone GC
# on the affected replica leading to data resurrection. # on the affected replica leading to data resurrection.
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
cmdline = ['--hinted-handoff-enabled', '0']
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
class _LeadershipTransferred(Exception): # Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
"""Raised when leadership transferred to servers[1] during the test, requiring a retry.""" # UNREPAIRED compaction view, making the race easy to trigger deterministically.
pass await cql.run_async(
f"ALTER TABLE {ks}.test WITH compaction = "
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
)
async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, token, scylla_path, current_key): # Disable autocompaction everywhere so we control exactly when compaction runs.
"""Core logic for test_incremental_repair_race_window_promotes_unrepaired_data. for s in servers:
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
scylla_path = await manager.server_get_exe(servers[0].server_id)
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
# S0'(repaired_at=1) on all nodes.
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# Insert keys 10-19 and flush on all nodes → S1(repaired_at=0).
# These will be the subject of repair 2.
repair2_keys = list(range(current_key, current_key + 10))
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
for s in servers:
await manager.api.flush_keyspace(s.ip_addr, ks)
current_key += 10
Returns the next current_key value.
Raises _LeadershipTransferred if servers[1] becomes coordinator after the
restart, signalling the caller to retry.
"""
# Ensure servers[1] is not the topology coordinator. If the coordinator is
# restarted, the Raft leader dies, a new election occurs, and the new
# coordinator re-initiates tablet repair -- flushing memtables on all replicas
# and marking post-repair data as repaired. That legitimate re-repair masks
# the compaction-merge bug this test detects.
coord = await get_topology_coordinator(manager) coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord) coord_serv = await find_server_by_host_id(manager, servers, coord)
if coord_serv == servers[1]:
other = next(s for s in servers if s != servers[1])
await ensure_group0_leader_on(manager, other)
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)
coord_log = await manager.server_open_log(coord_serv.server_id) coord_log = await manager.server_open_log(coord_serv.server_id)
coord_mark = await coord_log.mark() coord_mark = await coord_log.mark()
@@ -967,16 +978,6 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
await manager.server_start(target.server_id) await manager.server_start(target.server_id)
await manager.servers_see_each_other(servers) await manager.servers_see_each_other(servers)
# Check if leadership transferred to servers[1] during the restart.
# If so, the new coordinator will re-initiate repair, masking the bug.
new_coord = await get_topology_coordinator(manager)
new_coord_serv = await find_server_by_host_id(manager, servers, new_coord)
if new_coord_serv == servers[1]:
await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update")
await manager.api.wait_task(servers[0].ip_addr, task_id)
raise _LeadershipTransferred(
"servers[1] became topology coordinator after restart")
# Poll until compaction has produced F(repaired_at=2) containing post-repair keys, # Poll until compaction has produced F(repaired_at=2) containing post-repair keys,
# confirming that the bug was triggered (S1' and E merged during the race window). # confirming that the bug was triggered (S1' and E merged during the race window).
deadline = time.time() + 60 deadline = time.time() + 60
@@ -999,7 +1000,7 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
if not compaction_ran: if not compaction_ran:
logger.warning("Compaction did not merge S1' and E after restart during the race window; " logger.warning("Compaction did not merge S1' and E after restart during the race window; "
"the bug was not triggered. Skipping assertion.") "the bug was not triggered. Skipping assertion.")
return current_key return
# Flush servers[0] and servers[2] AFTER the race window closes so their post-repair # Flush servers[0] and servers[2] AFTER the race window closes so their post-repair
# keys land in G(repaired_at=0): correctly classified as UNREPAIRED. # keys land in G(repaired_at=0): correctly classified as UNREPAIRED.
@@ -1030,9 +1031,8 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, " f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, "
f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}") f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}")
# servers[0] and servers[2] were never restarted and the coordinator stayed # servers[0] and servers[2] flushed post-repair keys after the race window closed,
# alive throughout, so no re-repair could have flushed their memtables. # so those keys are in G(repaired_at=0) → correctly UNREPAIRED.
# Post-repair keys must NOT appear in repaired sstables on these servers.
assert not (repaired_keys_0 & post_repair_key_set), \ assert not (repaired_keys_0 & post_repair_key_set), \
f"servers[0] should not have post-repair keys in repaired sstables, " \ f"servers[0] should not have post-repair keys in repaired sstables, " \
f"got: {repaired_keys_0 & post_repair_key_set}" f"got: {repaired_keys_0 & post_repair_key_set}"
@@ -1053,54 +1053,6 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
f"on servers[1] after restart lost the being_repaired markers during the race window. " \ f"on servers[1] after restart lost the being_repaired markers during the race window. " \
f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \ f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \
f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}" f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"
return current_key
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
cmdline = ['--hinted-handoff-enabled', '0']
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
await cql.run_async(
f"ALTER TABLE {ks}.test WITH compaction = "
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
)
# Disable autocompaction everywhere so we control exactly when compaction runs.
for s in servers:
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
scylla_path = await manager.server_get_exe(servers[0].server_id)
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
# S0'(repaired_at=1) on all nodes.
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# Insert keys 10-19 and flush on all nodes -> S1(repaired_at=0).
# These will be the subject of repair 2.
repair2_keys = list(range(current_key, current_key + 10))
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
for s in servers:
await manager.api.flush_keyspace(s.ip_addr, ks)
current_key += 10
# If leadership transfers to servers[1] between our coordinator check and the
# restart, the coordinator change masks the bug. Detect and retry.
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
current_key = await _do_race_window_promotes_unrepaired_data(
manager, servers, cql, ks, token, scylla_path, current_key)
return
except _LeadershipTransferred as e:
logger.warning(f"Attempt {attempt}/{max_attempts}: {e}. Retrying.")
pytest.fail(f"Leadership kept transferring to servers[1] after {max_attempts} attempts; "
"could not run the test without coordinator interference.")
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
# Tombstone GC safety tests # Tombstone GC safety tests

View File

@@ -4,7 +4,7 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 # SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
# #
from test.pylib.manager_client import ManagerClient from test.pylib.manager_client import ManagerClient
from test.cluster.util import get_topology_coordinator, trigger_stepdown, new_test_keyspace, new_test_table from test.cluster.util import get_topology_coordinator, trigger_stepdown
import pytest import pytest
import logging import logging
@@ -83,78 +83,3 @@ async def test_load_stats_on_coordinator_failover(manager: ManagerClient):
coord3 = await get_topology_coordinator(manager) coord3 = await get_topology_coordinator(manager)
if coord3: if coord3:
break break
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_load_stats_refresh_during_shutdown(manager: ManagerClient):
"""Verify that _tablet_load_stats_refresh is properly joined during
topology coordinator shutdown, even when a schema change notification
triggers a refresh between run() completing and stop() being called.
Reproduces the scenario using two injection points:
- topology_coordinator_pause_before_stop: pauses after run() finishes
but before stop() is called
- refresh_tablet_load_stats_pause: holds refresh_tablet_load_stats()
so it's still in-flight during shutdown
Without the join in stop(), the refresh task outlives the coordinator
and accesses freed memory.
"""
servers = await manager.servers_add(3)
await manager.get_ready_cql(servers)
async with new_test_keyspace(manager,
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
coord = await get_topology_coordinator(manager)
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
coord_idx = host_ids.index(coord)
coord_server = servers[coord_idx]
log = await manager.server_open_log(coord_server.server_id)
mark = await log.mark()
# Injection B: pause between run() returning and stop() being called.
await manager.api.enable_injection(
coord_server.ip_addr, "topology_coordinator_pause_before_stop", one_shot=True)
# Stepdown causes the topology coordinator to abort and shut down.
logger.info("Triggering stepdown on coordinator")
await trigger_stepdown(manager, coord_server)
# Wait for injection B to fire. The coordinator has finished run() but
# the schema change listener is still registered.
mark, _ = await log.wait_for(
"topology_coordinator_pause_before_stop: waiting", from_mark=mark)
# Injection A: block refresh_tablet_load_stats() before it accesses _shared_tm.
# Enable it now so it only catches the notification-triggered call.
await manager.api.enable_injection(
coord_server.ip_addr, "refresh_tablet_load_stats_pause", one_shot=True)
# CREATE TABLE fires on_create_column_family on the old coordinator which
# fire-and-forgets _tablet_load_stats_refresh.trigger() scheduling a task
# via with_scheduling_group on the gossip scheduling group.
logger.info("Issuing CREATE TABLE while coordinator is paused before stop()")
async with new_test_table(manager, ks, "pk int PRIMARY KEY", reuse_tables=False):
# Wait for injection A: refresh_tablet_load_stats() is now blocked before
# accessing _shared_tm. The topology_coordinator is still alive (paused at B).
await log.wait_for("refresh_tablet_load_stats_pause: waiting", from_mark=mark)
# Release injection B: coordinator proceeds through stop().
# Without the fix, stop() returns quickly and run_topology_coordinator
# frees the topology_coordinator frame. With the fix, stop() blocks at
# _tablet_load_stats_refresh.join() until injection A is released.
logger.info("Releasing injection B: coordinator will stop")
await manager.api.message_injection(
coord_server.ip_addr, "topology_coordinator_pause_before_stop")
# Release injection A: refresh_tablet_load_stats() resumes and accesses
# this->_shared_tm via get_token_metadata_ptr(). Without the fix, 'this'
# points to freed memory and ASan detects heap-use-after-free.
logger.info("Releasing injection A: refresh resumes")
await manager.api.message_injection(
coord_server.ip_addr, "refresh_tablet_load_stats_pause")
# If the bug is present, the node crashed. read_barrier will fail.
await read_barrier(manager.api, coord_server.ip_addr)

View File

@@ -435,9 +435,8 @@ async def test_alter_tablets_rf_dc_drop(request: pytest.FixtureRequest, manager:
@pytest.mark.asyncio @pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest, manager: ManagerClient) -> None: async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str, host, ip_addr): async def get_replication_options(ks: str):
await read_barrier(manager.api, ip_addr) res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
repl = parse_replication_options(res[0].replication_v2 or res[0].replication) repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl return repl
@@ -452,44 +451,43 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
host_ids = [await manager.get_host_id(s.server_id) for s in servers] host_ids = [await manager.get_host_id(s.server_id) for s in servers]
cql = manager.get_cql() cql = manager.get_cql()
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);") await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1", host, servers[0].ip_addr) repl = await get_replication_options("ks1")
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
await cql.run_async("create keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 2} and tablets = {'initial': 4};") await cql.run_async("create keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 2} and tablets = {'initial': 4};")
await cql.run_async("create table ks2.t (pk int primary key);") await cql.run_async("create table ks2.t (pk int primary key);")
repl = await get_replication_options("ks2", host, servers[0].ip_addr) repl = await get_replication_options("ks2")
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
assert repl['dc2'] == '2' assert repl['dc2'] == '2'
await cql.run_async("create keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};") await cql.run_async("create keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
await cql.run_async("create table ks3.t (pk int primary key);") await cql.run_async("create table ks3.t (pk int primary key);")
repl = await get_replication_options("ks3", host, servers[0].ip_addr) repl = await get_replication_options("ks3")
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
await cql.run_async("create keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};") await cql.run_async("create keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
await cql.run_async("create table ks4.t (pk int primary key);") await cql.run_async("create table ks4.t (pk int primary key);")
repl = await get_replication_options("ks4", host, servers[0].ip_addr) repl = await get_replication_options("ks4")
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
await cql.run_async(f"create keyspace ks5 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks5 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks5.t (pk int primary key);") await cql.run_async("create table ks5.t (pk int primary key);")
repl = await get_replication_options("ks5", host, servers[0].ip_addr) repl = await get_replication_options("ks5")
assert repl['dc1'] == '2' assert repl['dc1'] == '2'
assert repl['dc2'] == '2' assert repl['dc2'] == '2'
await cql.run_async(f"create keyspace ks6 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks6 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks6.t (pk int primary key);") await cql.run_async("create table ks6.t (pk int primary key);")
repl = await get_replication_options("ks6", host, servers[0].ip_addr) repl = await get_replication_options("ks6")
assert repl['dc1'] == '2' assert repl['dc1'] == '2'
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers] [await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};") await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
repl = await get_replication_options("ks1", host, servers[0].ip_addr) repl = await get_replication_options("ks1")
assert repl['dc1'] == ['rack1b'] assert repl['dc1'] == ['rack1b']
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks1", "t") tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks1", "t")
@@ -499,7 +497,7 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
assert r.replicas[0][0] == host_ids[1] assert r.replicas[0][0] == host_ids[1]
await cql.run_async("alter keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a', 'rack2b']};") await cql.run_async("alter keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a', 'rack2b']};")
repl = await get_replication_options("ks2", host, servers[0].ip_addr) repl = await get_replication_options("ks2")
assert repl['dc1'] == ['rack1a'] assert repl['dc1'] == ['rack1a']
assert len(repl['dc2']) == 2 assert len(repl['dc2']) == 2
assert 'rack2a' in repl['dc2'] and 'rack2b' in repl['dc2'] assert 'rack2a' in repl['dc2'] and 'rack2b' in repl['dc2']
@@ -525,13 +523,13 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
pass pass
await cql.run_async("alter keyspace ks5 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b'], 'dc2' : 2};") await cql.run_async("alter keyspace ks5 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b'], 'dc2' : 2};")
repl = await get_replication_options("ks5", host, servers[0].ip_addr) repl = await get_replication_options("ks5")
assert len(repl['dc1']) == 2 assert len(repl['dc1']) == 2
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1'] assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
assert repl['dc2'] == '2' assert repl['dc2'] == '2'
await cql.run_async("alter keyspace ks6 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : 2, 'dc2' : ['rack2a']};") await cql.run_async("alter keyspace ks6 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : 2, 'dc2' : ['rack2a']};")
repl = await get_replication_options("ks6", host, servers[0].ip_addr) repl = await get_replication_options("ks6")
assert repl['dc1'] == '2' assert repl['dc1'] == '2'
assert len(repl['dc2']) == 1 assert len(repl['dc2']) == 1
assert repl['dc2'][0] == 'rack2a' assert repl['dc2'][0] == 'rack2a'
@@ -539,9 +537,8 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
@pytest.mark.asyncio @pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager: ManagerClient) -> None: async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str, host, ip_addr): async def get_replication_options(ks: str):
await read_barrier(manager.api, ip_addr) res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
repl = parse_replication_options(res[0].replication_v2 or res[0].replication) repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl return repl
@@ -554,11 +551,10 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc2', 'rack': 'rack2b'})] await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc2', 'rack': 'rack2b'})]
cql = manager.get_cql() cql = manager.get_cql()
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);") await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1", host, servers[0].ip_addr) repl = await get_replication_options("ks1")
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
await cql.run_async("CREATE KEYSPACE ksv WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2} AND tablets = {'enabled': false}") await cql.run_async("CREATE KEYSPACE ksv WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2} AND tablets = {'enabled': false}")
@@ -578,19 +574,19 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
servers = servers[0:-1] servers = servers[0:-1]
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};") await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
repl = await get_replication_options("ks1", host, servers[0].ip_addr) repl = await get_replication_options("ks1")
assert repl['dc1'] == ['rack1b'] assert repl['dc1'] == ['rack1b']
logging.info("Rolling restart") logging.info("Rolling restart")
await manager.rolling_restart(servers, wait_for_cql=True, cmdline_options_override=["--enforce-rack-list", "true", "--error-injections-at-startup", "[]", "--smp", "2"]) await manager.rolling_restart(servers, wait_for_cql=True, cmdline_options_override=["--enforce-rack-list", "true", "--error-injections-at-startup", "[]", "--smp", "2"])
await cql.run_async(f"create keyspace ks2 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks2 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
repl = await get_replication_options("ks2", host, servers[0].ip_addr) repl = await get_replication_options("ks2")
assert len(repl['dc1']) == 2 assert len(repl['dc1']) == 2
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1'] assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
await cql.run_async(f"create keyspace ks3 with replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks3 with replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': 4}};")
repl = await get_replication_options("ks3", host, servers[0].ip_addr) repl = await get_replication_options("ks3")
assert len(repl['dc1']) == 1 assert len(repl['dc1']) == 1
assert len(repl['dc2']) == 1 assert len(repl['dc2']) == 1
assert 'rack1a' in repl['dc1'] or 'rack1b' in repl['dc1'] assert 'rack1a' in repl['dc1'] or 'rack1b' in repl['dc1']
@@ -606,7 +602,7 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
assert failed assert failed
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b'], 'dc2': 1};") await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b'], 'dc2': 1};")
repl = await get_replication_options("ks1", host, servers[0].ip_addr) repl = await get_replication_options("ks1")
assert len(repl['dc1']) == 1 assert len(repl['dc1']) == 1
assert repl['dc1'][0] == 'rack1b' assert repl['dc1'][0] == 'rack1b'
assert len(repl['dc2']) == 1 assert len(repl['dc2']) == 1
@@ -1113,9 +1109,8 @@ async def test_multi_rf_increase_before_decrease_0_N(request: pytest.FixtureRequ
@pytest.mark.asyncio @pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureRequest, manager: ManagerClient) -> None: async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str, host, ip_addr): async def get_replication_options(ks: str):
await read_barrier(manager.api, ip_addr) res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
repl = parse_replication_options(res[0].replication_v2 or res[0].replication) repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl return repl
@@ -1133,11 +1128,10 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
host_ids = [await manager.get_host_id(s.server_id) for s in servers] host_ids = [await manager.get_host_id(s.server_id) for s in servers]
cql = manager.get_cql() cql = manager.get_cql()
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);") await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1", host, servers[0].ip_addr) repl = await get_replication_options("ks1")
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
[await manager.api.disable_injection(s.ip_addr, numeric_injection) for s in servers] [await manager.api.disable_injection(s.ip_addr, numeric_injection) for s in servers]
@@ -1171,7 +1165,7 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
failed = True failed = True
assert failed assert failed
repl = await get_replication_options("ks1", host, servers[0].ip_addr) repl = await get_replication_options("ks1")
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
@pytest.mark.asyncio @pytest.mark.asyncio

View File

@@ -18,7 +18,6 @@
#include <seastar/testing/test_case.hh> #include <seastar/testing/test_case.hh>
#include "test/lib/exception_utils.hh" #include "test/lib/exception_utils.hh"
#include "test/lib/log.hh"
#include "test/lib/test_utils.hh" #include "test/lib/test_utils.hh"
#include "ldap_common.hh" #include "ldap_common.hh"
#include "service/migration_manager.hh" #include "service/migration_manager.hh"
@@ -682,41 +681,3 @@ SEASTAR_TEST_CASE(ldap_config) {
}, },
make_ldap_config()); make_ldap_config());
} }
// Reproduces the race between the cache pruner and the permission
// loader lifecycle during shutdown. Refs SCYLLADB-1679.
SEASTAR_TEST_CASE(ldap_pruner_no_crash_after_loader_cleared) {
auto cfg = make_ldap_config();
cfg->permissions_update_interval_in_ms.set(1);
auto call_count = seastar::make_lw_shared<int>(0);
co_await do_with_cql_env_thread([call_count](cql_test_env& env) {
auto& cache = env.auth_cache().local();
testlog.info("Populating 50 cache entries");
for (int i = 0; i < 50; i++) {
auto r = auth::make_data_resource("system", fmt::format("t{}", i));
cache.get_permissions(auth::role_or_anonymous(), r).get();
}
testlog.info("Installing slow permission loader (10ms per call)");
cache.set_permission_loader(
[call_count] (const auth::role_or_anonymous&, const auth::resource&)
-> seastar::future<auth::permission_set> {
++(*call_count);
co_await seastar::sleep(std::chrono::milliseconds(10));
co_return auth::permission_set();
});
testlog.info("Waiting for pruner to start reloading");
while (*call_count == 0) {
seastar::sleep(std::chrono::milliseconds(1)).get();
}
testlog.info("Pruner started, letting teardown run");
}, cfg);
testlog.info("Loader called {} times", *call_count);
}

View File

@@ -371,13 +371,9 @@ int scylla_simple_query_main(int argc, char** argv) {
audit::audit::start_audit(env.local_db().get_config(), env.get_shared_token_metadata(), env.qp(), env.migration_manager()).handle_exception([&] (auto&& e) { audit::audit::start_audit(env.local_db().get_config(), env.get_shared_token_metadata(), env.qp(), env.migration_manager()).handle_exception([&] (auto&& e) {
fmt::print("audit start failed: {}", e); fmt::print("audit start failed: {}", e);
}).get(); }).get();
audit::audit::start_storage(env.local_db().get_config()).get();
auto audit_stop = defer([] { auto audit_stop = defer([] {
audit::audit::stop_audit().get(); audit::audit::stop_audit().get();
}); });
auto audit_storage_stop = defer([] {
audit::audit::stop_storage().get();
});
auto results = do_cql_test(env, cfg); auto results = do_cql_test(env, cfg);
aggregated_perf_results agg(results); aggregated_perf_results agg(results);
std::cout << agg << std::endl; std::cout << agg << std::endl;

View File

@@ -126,9 +126,6 @@ class CppFile(pytest.File, ABC):
return args return args
def collect(self) -> Iterator[CppTestCase]: def collect(self) -> Iterator[CppTestCase]:
if BUILD_MODE not in self.stash:
return
custom_args = self.suite_config.get("custom_args", {}).get(self.test_name, DEFAULT_CUSTOM_ARGS) custom_args = self.suite_config.get("custom_args", {}).get(self.test_name, DEFAULT_CUSTOM_ARGS)
for test_case in self.list_test_cases(): for test_case in self.list_test_cases():

View File

@@ -163,11 +163,6 @@ def scylla_binary(testpy_test) -> Path:
def pytest_collection_modifyitems(items: list[pytest.Item]) -> None: def pytest_collection_modifyitems(items: list[pytest.Item]) -> None:
items[:] = [
item for item in items
if (parent_file := item.getparent(cls=pytest.File)) is not None
and BUILD_MODE in parent_file.stash
]
for item in items: for item in items:
modify_pytest_item(item=item) modify_pytest_item(item=item)
@@ -290,10 +285,7 @@ def pytest_configure(config: pytest.Config) -> None:
pytest_log_dir.mkdir(parents=True, exist_ok=True) pytest_log_dir.mkdir(parents=True, exist_ok=True)
if not _pytest_config.getoption("--save-log-on-success"): if not _pytest_config.getoption("--save-log-on-success"):
for file in pytest_log_dir.glob("*"): for file in pytest_log_dir.glob("*"):
# This will help in case framework tests are executed with test.py event if it's the wrong way to run them. file.unlink()
# test_no_bare_skip_markers_in_collection uses a subprocess to run a collection that has lead to race
# condition, especially with repeat.
file.unlink(missing_ok=True)
_pytest_config.stash[PYTEST_LOG_FILE] = f"{pytest_log_dir}/pytest_main_{HOST_ID}.log" _pytest_config.stash[PYTEST_LOG_FILE] = f"{pytest_log_dir}/pytest_main_{HOST_ID}.log"
@@ -348,8 +340,7 @@ def pytest_collect_file(file_path: pathlib.Path,
repeats = list(product(build_modes, parent.config.run_ids)) repeats = list(product(build_modes, parent.config.run_ids))
if not repeats: if not repeats:
parent.stash[REPEATING_FILES].remove(file_path) return []
return collectors
ihook = parent.ihook ihook = parent.ihook
collectors = list(chain(collectors, chain.from_iterable( collectors = list(chain(collectors, chain.from_iterable(

View File

@@ -75,7 +75,6 @@ def test_no_bare_skip_markers_in_collection():
"--collect-only", "--collect-only",
"--ignore=boost", "--ignore=raft", "--ignore=boost", "--ignore=raft",
"--ignore=ldap", "--ignore=vector_search", "--ignore=ldap", "--ignore=vector_search",
"--ignore=unit",
"-p", "no:sugar"], "-p", "no:sugar"],
capture_output=True, text=True, capture_output=True, text=True,
cwd=str(_TEST_ROOT), cwd=str(_TEST_ROOT),