Compare commits

..

13 Commits

Author SHA1 Message Date
Benny Halevy
211eb7c32b test/cluster/dtest: wide_rows_test.py: use prepared statements
Replace string-formatted CQL queries in loops with prepared
statements and bind parameters. This avoids repeated query parsing
on the server side and eliminates CQL injection risk from string
interpolation.

Functions converted:
- test_column_index_stress: INSERT (100k iterations) and SELECT (10k)
- create_large_partition_data: UPDATE with TIMESTAMP
- create_large_row_data: UPDATE per column
- create_too_many_rows_data: UPDATE for columns and collections
- delete_too_many_rows_data: DELETE for columns and collections
- create_large_row_static_data: INSERT
- set_ttl_on_few_rows_in_partition: SELECT and UPDATE with TTL
- set_ttl_on_few_large_rows: SELECT and UPDATE with TTL
2026-04-28 11:40:16 +03:00
Benny Halevy
3a640a9ff0 test/cluster/dtest: wide_rows_test.py: randomize compaction strategy
Replace parametrized compaction strategy (4 strategies × 31 tests =
124 test cases) with random selection per test. This reduces the
test count to 31 while still covering all strategies over time.

Add --compaction-strategy option to allow reproducing failures with
a specific strategy, e.g.:
  ./test.py --mode=dev test/cluster/dtest/wide_rows_test.py \
    --pytest-arg="--compaction-strategy=LeveledCompactionStrategy"
2026-04-28 11:40:16 +03:00
Benny Halevy
8288f87beb test/cluster/dtest: wide_rows_test.py: reduce TTL sleep time
Reduce TTL from 60 to 1 second and sleep time from ttl+5 to ttl+1
in set_ttl_on_few_rows_in_partition() and set_ttl_on_few_large_rows().
The original 60-second TTL was unnecessarily high, adding over a
minute of idle wait time per TTL test invocation.
2026-04-28 11:40:16 +03:00
Benny Halevy
77c03354f4 test/cluster/dtest: wide_rows_test.py: fix key_appearance accumulation
In validate_entities_recognized_as_large(), key_appearance was
overwritten on each loop iteration instead of being accumulated.
This meant that for entity_type == "cell" in multi-node clusters,
entities_count only reflected the last node's count rather than
the total across all nodes. Fix by using += to accumulate.

Update expected_entity_number in test_large_cell_in_materialized_view
to account for RF=3 replication (each cell appears on all 3 nodes).

Bug inherited from scylla-dtest.
2026-04-28 11:40:16 +03:00
Benny Halevy
49df9242f7 test/cluster/dtest: wide_rows_test.py: scope compact to test keyspace/table
Pass KEYSPACE_NAME and TABLE_NAME to cluster.compact() instead of
compacting all keyspaces. This avoids unnecessary compaction of
system tables, making tests faster.

Also convert remaining nodetool("compact ...") calls to use
cluster.compact() for consistency.
2026-04-28 11:40:16 +03:00
Benny Halevy
8c82c6646b test/cluster/dtest: wide_rows_test.py: fix expect_warning mutation across nodes
In validate_log_warnings(), expect_warning was reassigned inside the
per-node loop, so if the first node set it to False (due to no
sstables on disk), all subsequent nodes would inherit that value
regardless of their own state.

Use a local variable (node_expect_warning) instead of mutating the
function parameter.
2026-04-28 11:40:16 +03:00
Benny Halevy
d76d0b8a16 test/cluster/dtest: wide_rows_test.py: remove dead code
Remove validation_small_entity() and get_large_entity_info() methods.
These are not called by any test in the migrated file.
get_large_entity_info() also had a bug where the CQL query used
escaped braces ({{keyspace_name}}) instead of actual parameter
substitution, so it would have queried for literal '{keyspace_name}'.
2026-04-28 11:40:16 +03:00
Benny Halevy
690672e4cb test/cluster/dtest: wide_rows_test.py: cosmetic cleanups
Fix typos: aproximately, quering, colection, table_nam, the the.
Fix grammar: 'verify the they didn't recognized as large'.
Use idiomatic 'not in' instead of 'not x in'.
Remove unused variable assignment and commented-out debug line.
Remove unnecessary f-string prefix.
Fix '/n' to use actual newline in error message formatting.
Fix extra trailing quotes in exception messages.
Remove redundant variable assignment (maximum_primary_key_value).
2026-04-28 11:40:13 +03:00
Benny Halevy
85079d7c7a test/cluster/dtest: migrate wide_rows_test.py from scylla-dtest
Adapt wide_rows_test.py to work with the in-tree cluster test
framework:
- Replace dtest imports with in-tree equivalents
- Replace self.cluster.flush() + self.cluster.wait_for_compactions()
  with self.cluster.compact() since nodetool compact handles flush
  and waiting internally
- Add inline wait_for_view() helper (replaces async version)
- Replace node.status with is_running() check
- Add copyright header

Remove from skip_in_dev now that all tests pass.
2026-04-28 11:39:47 +03:00
Benny Halevy
70f8fcbe67 test/cluster/dtest: cache ScyllaNode hostid
Cache the host ID in ScyllaNode._hostid so that hostid() returns
the cached value when the node is stopped.  Without this,
watch_log_for_death() fails with a timeout because it tries to
query the stopped node's API to get its host ID for the log
pattern match.
2026-04-28 11:36:08 +03:00
Benny Halevy
1d6403ddad test/cluster/dtest: add ScyllaCluster.compact() method
Add compact() method to ScyllaCluster, delegating to
ScyllaNode.compact() on each running node. Accepts optional
keyspace and tables parameters to allow scoping compaction to
specific keyspaces/tables.

Also fix ScyllaNode.compact() to use list[str] for tables
parameter and extend() instead of +=, so that passing a single
table name as a string does not iterate over its characters.
2026-04-28 11:36:08 +03:00
Benny Halevy
9a24be2fe9 test/cluster/dtest: add assertion helpers for wide_rows_test
Add assert_equal_more_with_deviation() and assert_less_equal_lists()
to tools/assertions.py.  These are needed by the wide_rows_test.py
migration from scylla-dtest.
2026-04-28 11:36:08 +03:00
Benny Halevy
5c93ccb6d8 test/cluster/dtest: copy wide_rows_test.py verbatim from scylla-dtest
Copy wide_rows_test.py as-is from scylla-dtest. The test is added
to run_in_dev but also skip_in_dev in test_config.yaml since it
requires functional changes to work with the in-tree test
framework. The next commit will make the necessary changes and
remove it from skip_in_dev.
2026-04-28 11:36:08 +03:00
34 changed files with 1677 additions and 700 deletions

View File

@@ -194,36 +194,22 @@ future<> audit::start_audit(const db::config& cfg, sharded<locator::shared_token
std::move(audited_keyspaces),
std::move(audited_tables),
std::move(audited_categories),
std::cref(cfg));
}
future<> audit::start_storage(const db::config& cfg) {
if (!audit_instance().local_is_initialized()) {
return make_ready_future<>();
}
return audit_instance().invoke_on_all([&cfg] (audit& local_audit) {
return local_audit._storage_helper_ptr->start(cfg).then([&local_audit] {
local_audit._storage_running = true;
std::cref(cfg))
.then([&cfg] {
if (!audit_instance().local_is_initialized()) {
return make_ready_future<>();
}
return audit_instance().invoke_on_all([&cfg] (audit& local_audit) {
return local_audit.start(cfg);
});
});
}
future<> audit::stop_storage() {
if (!audit_instance().local_is_initialized()) {
return make_ready_future<>();
}
return audit_instance().invoke_on_all([] (audit& local_audit) {
local_audit._storage_running = false;
return local_audit._storage_helper_ptr->stop();
});
}
future<> audit::stop_audit() {
if (!audit_instance().local_is_initialized()) {
return make_ready_future<>();
}
return audit::audit::audit_instance().invoke_on_all([] (auto& local_audit) {
SCYLLA_ASSERT(!local_audit._storage_running);
return local_audit.shutdown();
}).then([] {
return audit::audit::audit_instance().stop();
@@ -237,6 +223,14 @@ audit_info_ptr audit::create_audit_info(statement_category cat, const sstring& k
return std::make_unique<audit_info>(cat, keyspace, table, batch);
}
future<> audit::start(const db::config& cfg) {
return _storage_helper_ptr->start(cfg);
}
future<> audit::stop() {
return _storage_helper_ptr->stop();
}
future<> audit::shutdown() {
return make_ready_future<>();
}
@@ -247,12 +241,6 @@ future<> audit::log(const audit_info& audit_info, const service::client_state& c
const sstring& username = client_state.user() ? client_state.user()->name.value_or(anonymous_username) : no_username;
socket_address client_ip = client_state.get_client_address().addr();
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
if (!_storage_running) {
on_internal_error_noexcept(logger, fmt::format("Audit log dropped (storage not ready): node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}",
node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(),
audit_info.query(), client_ip, audit_info.table(), username));
return make_ready_future<>();
}
if (logger.is_enabled(logging::log_level::debug)) {
logger.debug("Log written: node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}",
node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(),
@@ -298,11 +286,6 @@ future<> inspect(const audit_info_alternator& ai, const service::client_state& c
future<> audit::log_login(const sstring& username, socket_address client_ip, bool error) noexcept {
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
if (!_storage_running) {
on_internal_error_noexcept(logger, fmt::format("Audit login log dropped (storage not ready): node_ip {} client_ip {} username {} error {}",
node_ip, client_ip, username, error ? "true" : "false"));
return make_ready_future<>();
}
if (logger.is_enabled(logging::log_level::debug)) {
logger.debug("Login log written: node_ip {}, client_ip {}, username {}, error {}",
node_ip, client_ip, username, error ? "true" : "false");

View File

@@ -141,7 +141,6 @@ private:
category_set _audited_categories;
std::unique_ptr<storage_helper> _storage_helper_ptr;
bool _storage_running = false;
const db::config& _cfg;
utils::observer<sstring> _cfg_keyspaces_observer;
@@ -164,8 +163,6 @@ public:
return audit_instance().local();
}
static future<> start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm);
static future<> start_storage(const db::config& cfg);
static future<> stop_storage();
static future<> stop_audit();
static audit_info_ptr create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table, bool batch = false);
audit(locator::shared_token_metadata& stm,
@@ -177,6 +174,8 @@ public:
category_set&& audited_categories,
const db::config& cfg);
~audit();
future<> start(const db::config& cfg);
future<> stop();
future<> shutdown();
bool should_log(const audit_info& audit_info) const;
bool will_log(statement_category cat, std::string_view keyspace = {}, std::string_view table = {}) const;

View File

@@ -258,11 +258,13 @@ future<> ldap_role_manager::start() {
} catch (const seastar::sleep_aborted&) {
co_return; // ignore
}
try {
co_await _cache.reload_all_permissions();
} catch (...) {
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
}
co_await _cache.container().invoke_on_all([] (cache& c) -> future<> {
try {
co_await c.reload_all_permissions();
} catch (...) {
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
}
});
}
});
return _std_mgr.start();

View File

@@ -157,20 +157,6 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
return create_legacy_keyspace_if_missing(mm);
});
}
// Authorizer must be started before the permission loader is set,
// because the loader calls _authorizer->authorize().
// The loader must be set before starting the role manager, because
// LDAP role manager starts a pruner fiber that calls
// reload_all_permissions() which asserts _permission_loader is set.
co_await _authorizer->start();
if (!_used_by_maintenance_socket) {
// Maintenance socket mode can't cache permissions because it has
// different authorizer. We can't mix cached permissions, they could be
// different in normal mode.
_cache.set_permission_loader(std::bind(
&service::get_uncached_permissions,
this, std::placeholders::_1, std::placeholders::_2));
}
co_await _role_manager->start();
if (this_shard_id() == 0) {
// Role manager and password authenticator have this odd startup
@@ -179,19 +165,21 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
// creation therefore we need to wait here.
co_await _role_manager->ensure_superuser_is_created();
}
// Authenticator must be started after ensure_superuser_is_created()
// because password_authenticator queries system.roles for the
// superuser entry created by the role manager.
co_await _authenticator->start();
co_await when_all_succeed(_authorizer->start(), _authenticator->start()).discard_result();
if (!_used_by_maintenance_socket) {
// Maintenance socket mode can't cache permissions because it has
// different authorizer. We can't mix cached permissions, they could be
// different in normal mode.
_cache.set_permission_loader(std::bind(
&service::get_uncached_permissions,
this, std::placeholders::_1, std::placeholders::_2));
}
}
future<> service::stop() {
_as.request_abort();
// Reverse of start() order.
co_await _authenticator->stop();
co_await _role_manager->stop();
_cache.set_permission_loader(nullptr);
co_await _authorizer->stop();
return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop()).discard_result();
}
future<> service::ensure_superuser_is_created() {

View File

@@ -593,7 +593,6 @@ scylla_tests = set([
'test/boost/linearizing_input_stream_test',
'test/boost/lister_test',
'test/boost/locator_topology_test',
'test/boost/lock_tables_metadata_test',
'test/boost/log_heap_test',
'test/boost/logalloc_standard_allocator_segment_pool_backend_test',
'test/boost/logalloc_test',
@@ -1711,7 +1710,7 @@ deps['test/boost/combined_tests'] += [
'test/boost/sstable_compression_config_test.cc',
'test/boost/sstable_directory_test.cc',
'test/boost/sstable_set_test.cc',
'test/boost/sstable_tablet_streaming_test.cc',
'test/boost/sstable_tablet_streaming.cc',
'test/boost/statement_restrictions_test.cc',
'test/boost/storage_proxy_test.cc',
'test/boost/tablets_test.cc',

View File

@@ -399,10 +399,9 @@ future<> gossiper::do_send_ack2_msg(locator::host_id from, utils::chunked_vector
}
}
gms::gossip_digest_ack2 ack2_msg(std::move(delta_ep_state_map));
auto ack2_msg_str = fmt::format("{}", ack2_msg);
logger.debug("Calling do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg_str);
logger.debug("Calling do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg);
co_await ser::gossip_rpc_verbs::send_gossip_digest_ack2(&_messaging, from, std::move(ack2_msg));
logger.debug("finished do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg_str);
logger.debug("finished do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg);
}
// Depends on

46
main.cc
View File

@@ -1810,18 +1810,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
utils::get_local_injector().inject("stop_after_starting_migration_manager",
[] { std::raise(SIGSTOP); });
// Audit must be constructed before the maintenance socket so
// that on shutdown (reverse destruction order) the audit service
// outlives the maintenance socket and in-flight queries can
// still reach audit::inspect() safely.
checkpoint(stop_signal, "starting audit service");
audit::audit::start_audit(*cfg, token_metadata, qp, mm).handle_exception([&] (auto&& e) {
startlog.error("audit start failed: {}", e);
}).get();
auto audit_stop = defer([] {
audit::audit::stop_audit().get();
});
// XXX: stop_raft has to happen before query_processor and migration_manager
// is stopped, since some groups keep using the query
// processor until are stopped inside stop_raft.
@@ -2352,22 +2340,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
}).get();
stop_signal.ready(false);
// At this point, `locator::topology` should be stable, i.e. we should have complete information
// about the layout of the cluster (= list of nodes along with the racks/DCs).
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
db.local().check_rf_rack_validity(token_metadata.local().get());
startlog.info("Verifying that all of the tablet keyspaces use rack list replication factors");
db.local().check_rack_list_everywhere(cfg->enforce_rack_list());
// The table-based audit backend needs Raft (via join_cluster)
// to create its keyspace and table.
checkpoint(stop_signal, "starting audit storage");
audit::audit::start_storage(*cfg).get();
auto audit_storage_stop = defer([] {
audit::audit::stop_storage().get();
});
if (cfg->maintenance_socket() != "ignore") {
// Enable role operations now that node joined the cluster
maintenance_auth_service.invoke_on_all([](auth::service& svc) {
@@ -2377,6 +2349,24 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
start_cql(*cql_maintenance_server_ctl, stop_maintenance_cql, "maintenance native server");
}
// At this point, `locator::topology` should be stable, i.e. we should have complete information
// about the layout of the cluster (= list of nodes along with the racks/DCs).
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
db.local().check_rf_rack_validity(token_metadata.local().get());
startlog.info("Verifying that all of the tablet keyspaces use rack list replication factors");
db.local().check_rack_list_everywhere(cfg->enforce_rack_list());
// Start audit service after join_cluster so that the table-based audit backend
// can properly create its keyspace and table.
checkpoint(stop_signal, "starting audit service");
audit::audit::start_audit(*cfg, token_metadata, qp, mm).handle_exception([&] (auto&& e) {
startlog.error("audit start failed: {}", e);
}).get();
auto audit_stop = defer([] {
audit::audit::stop_audit().get();
});
// Semantic validation of sstable compression parameters from config.
// Adding here (i.e., after `join_cluster`) to ensure that the
// required SSTABLE_COMPRESSION_DICTS cluster feature has been negotiated.

View File

@@ -1328,27 +1328,9 @@ future<global_table_ptr> get_table_on_all_shards(sharded<database>& sharded_db,
future<tables_metadata_lock_on_all_shards> database::lock_tables_metadata(sharded<database>& sharded_db) {
tables_metadata_lock_on_all_shards locks;
// Acquire write lock on shard 0 first, and then on the remaining shards.
//
// Parallel acquisition on all shards could deadlock when two
// fibers call lock_tables_metadata() concurrently: parallel_for_each
// sends SMP messages to all shards even when the local shard's lock
// attempt blocks. If task reordering (SEASTAR_SHUFFLE_TASK_QUEUE in
// debug/sanitize builds) causes fiber A to win on shard X while
// fiber B wins on shard Y, neither can make progress — classic
// cross-shard lock-ordering deadlock.
//
// Acquiring the write lock on shard 0 first, and then on the remaining
// shards, eliminates this: whichever fiber acquires shard 0 first is
// guaranteed to acquire locks on all other shards before the other fiber
// can acquire the lock on shard 0.
co_await sharded_db.invoke_on(0, [&locks, &sharded_db] (auto& db) -> future<> {
co_await sharded_db.invoke_on_all([&] (auto& db) -> future<> {
locks.assign_lock(co_await db.get_tables_metadata().hold_write_lock());
co_await sharded_db.invoke_on_others([&locks] (auto& db) -> future<> {
locks.assign_lock(co_await db.get_tables_metadata().hold_write_lock());
});
});
co_return locks;
}

View File

@@ -5466,9 +5466,10 @@ class scylla_compaction_tasks(gdb.Command):
try:
task_list = list(intrusive_list(cm['_tasks']))
except gdb.error: # 6.2 compatibility
task_list = [seastar_shared_ptr(t).get().dereference() for t in std_list(cm['_tasks'])]
task_list = list(std_list(cm['_tasks']))
for task in task_list:
task = seastar_shared_ptr(task).get().dereference()
schema = schema_ptr(task['_compacting_table'].dereference()['_schema'])
key = 'type={}, state={:5}, {}'.format(task['_type'], str(task['_state']), schema.table_name())
task_hist.add(key)

View File

@@ -438,10 +438,9 @@ static future<cql3::untyped_result_set> do_execute_cql_with_timeout(sstring req,
const auto cache_key = qp.compute_id(req, "", cql3::internal_dialect());
auto ps_ptr = qp.get_prepared(cache_key);
shared_ptr<cql_transport::messages::result_message::prepared> prepared_msg;
if (!ps_ptr) {
prepared_msg = co_await qp.prepare(req, qs, cql3::internal_dialect());
ps_ptr = prepared_msg->get_prepared();
const auto msg_ptr = co_await qp.prepare(req, qs, cql3::internal_dialect());
ps_ptr = msg_ptr->get_prepared();
if (!ps_ptr) {
on_internal_error(paxos_state::logger, "prepared statement is null");
}
@@ -450,8 +449,8 @@ static future<cql3::untyped_result_set> do_execute_cql_with_timeout(sstring req,
-1, service::node_local_only::yes);
const auto st = ps_ptr->statement;
const auto result_ptr = co_await st->execute(qp, qs, qo, std::nullopt);
co_return cql3::untyped_result_set(result_ptr);
const auto msg_ptr = co_await st->execute(qp, qs, qo, std::nullopt);
co_return cql3::untyped_result_set(msg_ptr);
}
template <typename... Args>

View File

@@ -4237,7 +4237,6 @@ public:
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
, _async_gate("topology_coordinator")
{
_lifecycle_notifier.register_subscriber(this);
_db.get_notifier().register_listener(this);
// When the delay_cdc_stream_finalization error injection is disabled
// (test releases it), wake the topology coordinator so it retries
@@ -4401,7 +4400,6 @@ future<bool> topology_coordinator::maybe_retry_failed_rf_change_tablet_rebuilds(
}
future<> topology_coordinator::refresh_tablet_load_stats() {
co_await utils::get_local_injector().inject("refresh_tablet_load_stats_pause", utils::wait_for_message(5min));
auto tm = get_token_metadata_ptr();
locator::load_stats stats;
@@ -4725,6 +4723,7 @@ future<> topology_coordinator::run() {
co_await _async_gate.close();
co_await std::move(tablet_load_stats_refresher);
co_await _tablet_load_stats_refresh.join();
co_await std::move(cdc_generation_publisher);
co_await std::move(cdc_streams_gc);
co_await std::move(gossiper_orphan_remover);
@@ -4737,8 +4736,6 @@ future<> topology_coordinator::stop() {
co_await _db.get_notifier().unregister_listener(this);
utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization");
_topo_sm.on_tablet_split_ready = nullptr;
co_await _lifecycle_notifier.unregister_subscriber(this);
co_await _tablet_load_stats_refresh.join();
// if topology_coordinator::run() is aborted either because we are not a
// leader anymore, or we are shutting down as a leader, we have to handle
@@ -4800,6 +4797,7 @@ future<> run_topology_coordinator(
topology_cmd_rpc_tracker};
std::exception_ptr ex;
lifecycle_notifier.register_subscriber(&coordinator);
try {
rtlogger.info("start topology coordinator fiber");
co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
@@ -4820,7 +4818,7 @@ future<> run_topology_coordinator(
}
on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex));
}
co_await utils::get_local_injector().inject("topology_coordinator_pause_before_stop", utils::wait_for_message(5min));
co_await lifecycle_notifier.unregister_subscriber(&coordinator);
co_await coordinator.stop();
}

View File

@@ -543,16 +543,11 @@ future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept {
// during SSTable writing and removed before sealing. If the write
// failed before sealing, the file may still be on disk and must be
// cleaned up explicitly.
// The component is only defined for the `ms` sstable format; for
// older formats it is absent from the component map and looking up
// its filename would throw std::out_of_range.
// Use file_exists() to avoid a C++ exception on the common path
// where the file was already removed before sealing.
if (sstable_version_constants::get_component_map(sst.get_version()).contains(component_type::TemporaryHashes)) {
auto temp_hashes = filename(sst, dir_name.native(), sst._generation, component_type::TemporaryHashes);
if (co_await file_exists(temp_hashes)) {
co_await sst.sstable_write_io_check(remove_file, std::move(temp_hashes));
}
auto temp_hashes = filename(sst, dir_name.native(), sst._generation, component_type::TemporaryHashes);
if (co_await file_exists(temp_hashes)) {
co_await sst.sstable_write_io_check(remove_file, std::move(temp_hashes));
}
if (sync) {
co_await sst.sstable_write_io_check(sync_directory, dir_name.native());

View File

@@ -135,23 +135,7 @@ future<> table_helper::cache_table_info(cql3::query_processor& qp, service::migr
}
future<> table_helper::insert(cql3::query_processor& qp, service::migration_manager& mm, service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker) {
// _prepared_stmt is a checked_weak_ptr into the prepared statements
// cache and can be invalidated by a concurrent purge (e.g. on a schema
// change). cache_table_info() (re-)prepares and assigns _prepared_stmt,
// but the pin protecting the entry is dropped when try_prepare()
// returns. In release the chain of ready-future co_awaits back to here
// resumes synchronously, but debug builds preempt on every co_await
// even for ready futures, opening a window for a purge to drop the
// entry and leave _prepared_stmt null. Loop until a synchronous
// post-resume check finds _prepared_stmt valid; nothing can run between
// that check and the dereference below. _insert_stmt is a strong
// shared_ptr and is not affected by cache invalidation.
while (true) {
co_await cache_table_info(qp, mm, qs);
if (_prepared_stmt) {
break;
}
}
co_await cache_table_info(qp, mm, qs);
auto opts = opt_maker();
opts.prepare(_prepared_stmt->bound_names);
co_await _insert_stmt->execute(qp, qs, opts, std::nullopt);

76
test.py
View File

@@ -11,11 +11,9 @@ from __future__ import annotations
import argparse
import asyncio
import dataclasses
import math
import shlex
import textwrap
from bisect import insort
from random import randint
import pytest
@@ -185,8 +183,6 @@ def parse_cmd_line() -> argparse.Namespace:
help="Specific byte limit for failure injection (random by default)")
parser.add_argument('--skip-internet-dependent-tests', action="store_true",
help="Skip tests which depend on artifacts from the internet.")
parser.add_argument('--keep-duplicates', action='store_true', default=False,
help="Do not deduplicate test arguments.")
parser.add_argument("--pytest-arg", action='store', type=str,
default=None, dest="pytest_arg",
help="Additional command line arguments to pass to pytest, for example ./test.py --pytest-arg=\"-v -x\"")
@@ -245,73 +241,6 @@ def parse_cmd_line() -> argparse.Namespace:
return args
# TODO: Remove _CollectionArgument and _deduplicate_test_args once we update
# to pytest 9.x, which fixes argument deduplication:
# https://github.com/pytest-dev/pytest/issues/12083
@dataclasses.dataclass(frozen=True, order=True)
class _CollectionArgument:
"""Resolved collection argument for deduplication.
A version-independent subset of pytest's CollectionArgument that
includes the fields needed for normalization (parametrization and
original_index were added in pytest 9.0).
``a in b`` means ``b`` subsumes (contains) ``a``. Adapted from
pytest 9.0.3 ``_pytest.main.is_collection_argument_subsumed_by``.
"""
path: pathlib.Path
parts: tuple[str, ...]
parametrization: str
original_index: int
def __contains__(self, other: _CollectionArgument) -> bool:
if self.path != other.path:
return not self.parts and other.path.is_relative_to(self.path)
if len(self.parts) > len(other.parts) or other.parts[:len(self.parts)] != self.parts:
return False
return not self.parametrization or self.parametrization == other.parametrization
def _deduplicate_test_args(args: list[str]) -> list[str]:
"""Remove duplicate and subsumed test arguments.
Resolves and normalizes CLI test arguments, then applies the normalization
algorithm from pytest 9.0.3 to remove exact duplicates and arguments whose
paths are contained within another argument's path.
For example, ``["test/cql", "test/cql/lua_test.cql"]`` becomes ``["test/cql"]``.
"""
if not args:
return args
invocation_path = pathlib.Path.cwd()
resolved_sorted: list[_CollectionArgument] = []
unresolved_indices: set[int] = set()
for i, arg in enumerate(args):
# Adapted from pytest 9.0.3 _pytest.main.resolve_collection_argument.
base, squacket, rest = arg.partition("[")
strpath, *parts = base.split("::")
fspath = pathlib.Path(os.path.abspath(invocation_path / strpath))
if not fspath.exists():
# Keep unresolved args — let pytest report the error.
unresolved_indices.add(i)
continue
insort(resolved_sorted, _CollectionArgument(
path=fspath,
parts=tuple(parts),
parametrization=squacket + rest,
original_index=i,
))
# Normalize: remove duplicates and subsumed arguments using an O(n log n)
# sort-based algorithm adapted from pytest 9.0.3.
normalized = resolved_sorted[:1]
for ca in resolved_sorted[1:]:
if ca not in normalized[-1]:
normalized.append(ca)
kept_indices = {ca.original_index for ca in normalized} | unresolved_indices
return [arg for i, arg in enumerate(args) if i in kept_indices]
def run_pytest(options: argparse.Namespace) -> int:
# When tests are executed in parallel on different hosts, we need to distinguish results from them.
# So HOST_ID needed to not overwrite results from different hosts during Jenkins will copy to one directory.
@@ -320,8 +249,7 @@ def run_pytest(options: argparse.Namespace) -> int:
report_dir = temp_dir / 'report'
junit_output_file = report_dir / f'pytest_cpp_{HOST_ID}.xml'
files_to_run = options.name if options.keep_duplicates else _deduplicate_test_args(options.name)
files_to_run = files_to_run or [str(TOP_SRC_DIR / 'test/')]
files_to_run = options.name or [str(TOP_SRC_DIR / 'test/')]
args = [
'--color=yes',
f'--repeat={options.repeat}',
@@ -341,8 +269,6 @@ def run_pytest(options: argparse.Namespace) -> int:
])
if options.verbose:
args.append('-v')
if options.keep_duplicates:
args.append('--keep-duplicates')
if options.quiet:
args.append('--quiet')
args.extend(['-p','no:sugar'])

View File

@@ -150,8 +150,6 @@ add_scylla_test(lister_test
KIND SEASTAR)
add_scylla_test(locator_topology_test
KIND SEASTAR)
add_scylla_test(lock_tables_metadata_test
KIND SEASTAR)
add_scylla_test(log_heap_test
KIND BOOST)
add_scylla_test(logalloc_standard_allocator_segment_pool_backend_test
@@ -376,7 +374,7 @@ add_scylla_test(combined_tests
sstable_compression_config_test.cc
sstable_directory_test.cc
sstable_set_test.cc
sstable_tablet_streaming_test.cc
sstable_tablet_streaming.cc
statement_restrictions_test.cc
storage_proxy_test.cc
tablets_test.cc

View File

@@ -1,36 +0,0 @@
/*
* Copyright (C) 2026-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#include <fmt/format.h>
#include <seastar/core/with_timeout.hh>
#include <seastar/testing/test_case.hh>
#include "test/lib/cql_test_env.hh"
using namespace std::chrono_literals;
// Test that two lock_tables_metadata calls don't deadlock
SEASTAR_TEST_CASE(test_lock_tables_metadata_deadlock) {
return do_with_cql_env_thread([](cql_test_env& e) {
try {
// Repeat the test scenario to increase the chance of hitting the deadlock.
// If no deadlock occurs, each repetition should complete within a fraction of a second,
// so even with 100 repetitions, the total test time should be reasonable.
for (int i = 0; i < 100; ++i) {
with_timeout(lowres_clock::now() + 30s,
when_all_succeed(
e.local_db().lock_tables_metadata(e.db()).discard_result(),
e.local_db().lock_tables_metadata(e.db()).discard_result()
)).get();
}
} catch (seastar::timed_out_error&) {
fmt::print(stderr, "FAIL: lock_tables_metadata deadlocked (timed out after 30s)\n");
_exit(1);
}
});
}

View File

@@ -246,33 +246,6 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_extra_temporary_toc) {
});
}
// Reproducer for SCYLLADB-1697
SEASTAR_TEST_CASE(sstable_directory_test_unlink_sstable_leaves_no_orphans) {
return sstables::test_env::do_with_async([] (test_env& env) {
for (const auto version : {sstable_version_types::me, sstable_version_types::ms}) {
testlog.info("Testing sstable version: {}", version);
auto sst = make_sstable_for_this_shard([&env, version] {
return env.make_sstable(test_table_schema(), version);
});
// Sanity: the TOC was written, otherwise the assertion below would be vacuous.
BOOST_REQUIRE(file_exists(test(sst).filename(sstables::component_type::TOC).native()).get());
sst->unlink().get();
std::vector<sstring> remaining;
lister::scan_dir(env.tempdir().path(), lister::dir_entry_types::of<directory_entry_type::regular>(),
[&remaining] (fs::path, directory_entry de) {
remaining.push_back(de.name);
return make_ready_future<>();
}).get();
BOOST_REQUIRE_MESSAGE(remaining.empty(),
fmt::format("Expected empty sstable dir after unlink for version {}, found: {}", version, remaining));
}
});
}
// Test the absence of TOC. Behavior is controllable by a flag
SEASTAR_TEST_CASE(sstable_directory_test_table_missing_toc) {
return sstables::test_env::do_with_async([] (test_env& env) {

View File

@@ -11,11 +11,13 @@ from typing import TYPE_CHECKING
from cassandra.auth import PlainTextAuthProvider
from test.pylib.internal_types import ServerInfo
from test.pylib.manager_client import ManagerClient
from test.cluster.dtest.ccmlib.common import logger
from test.cluster.dtest.ccmlib.scylla_node import ScyllaNode
if TYPE_CHECKING:
from collections.abc import Iterable
from typing import Any
@@ -27,10 +29,6 @@ class ScyllaCluster:
self.manager = manager
self.scylla_mode = scylla_mode
self._config_options = {}
# Cached ScyllaNode instances. Nodes are appended by _add_nodes()
# in the order they are created by servers_add().
self._nodes: list[ScyllaNode] = []
self._next_node_num: int = 1
if self.scylla_mode == "debug":
self.default_wait_other_notice_timeout = 600
@@ -41,20 +39,19 @@ class ScyllaCluster:
self.force_wait_for_cluster_start = force_wait_for_cluster_start
def _add_nodes(self, servers: list) -> None:
"""Create ScyllaNode instances for the given servers and cache them."""
for server in servers:
name = f"node{self._next_node_num}"
self._next_node_num += 1
self._nodes.append(ScyllaNode(
cluster=self, server=server, name=name))
@staticmethod
def _sorted_nodes(servers: Iterable[ServerInfo]) -> list[ServerInfo]:
return sorted(servers, key=lambda s: s.server_id)
@property
def nodes(self) -> dict[str, ScyllaNode]:
return {node.name: node for node in self.nodelist()}
def nodelist(self) -> list[ScyllaNode]:
return list(self._nodes)
return [
ScyllaNode(cluster=self, server=server, name=f"node{n}")
for n, server in enumerate(self._sorted_nodes(self.manager.all_servers()), start=1)
]
def get_node_ip(self, nodeid: int) -> str:
return self.nodelist()[nodeid-1].address()
@@ -64,16 +61,16 @@ class ScyllaCluster:
self.manager.auth_provider = PlainTextAuthProvider(username="cassandra", password="cassandra")
match nodes:
case int():
self._add_nodes(self.manager.servers_add(servers_num=nodes, config=self._config_options, start=False, auto_rack_dc="dc1"))
self.manager.servers_add(servers_num=nodes, config=self._config_options, start=False, auto_rack_dc="dc1")
case list():
for dc, n_nodes in enumerate(nodes, start=1):
dc_name = f"dc{dc}"
self._add_nodes(self.manager.servers_add(
self.manager.servers_add(
servers_num=n_nodes,
config=self._config_options,
start=False,
auto_rack_dc=dc_name
))
)
case dict():
# Supported spec: {"dc1": {"rack1": 3, "rack2": 2}, "dc2": {"rack1": 2}}
for dc, dc_nodes in nodes.items():
@@ -82,7 +79,7 @@ class ScyllaCluster:
for rack, rack_nodes in dc_nodes.items():
if not isinstance(rack_nodes, int):
raise RuntimeError(f"Unsupported topology specification: {nodes}")
self._add_nodes(self.manager.servers_add(
self.manager.servers_add(
servers_num=rack_nodes,
config=self._config_options,
property_file={
@@ -90,7 +87,7 @@ class ScyllaCluster:
"rack": rack,
},
start=False,
))
)
case _:
raise RuntimeError(f"Unsupported topology specification: {nodes}")
@@ -230,6 +227,11 @@ class ScyllaCluster:
def flush(self) -> None:
self.nodetool("flush")
def compact(self, keyspace: str = "", tables: list[str] | None = None) -> None:
for node in self.nodelist():
if node.is_running():
node.compact(keyspace=keyspace, tables=tables)
@staticmethod
def debug(message: str) -> None:
logger.debug(message)

View File

@@ -17,7 +17,6 @@ from itertools import chain
from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING, Any
import logging
from test.cluster.dtest.ccmlib.common import ArgumentError, wait_for, BIN_DIR
from test.pylib.internal_types import ServerUpState
@@ -29,9 +28,6 @@ if TYPE_CHECKING:
from test.cluster.dtest.ccmlib.scylla_cluster import ScyllaCluster
logger = logging.getLogger("scylla_node")
NODETOOL_STDERR_IGNORED_PATTERNS = (
re.compile(r"WARNING: debug mode. Not for benchmarking or production"),
re.compile(
@@ -115,6 +111,7 @@ class ScyllaNode:
self.data_center = server.datacenter
self.rack = server.rack
self._hostid = None
self._smp_set_during_test = None
self._smp = None
self._memory = None
@@ -153,20 +150,15 @@ class ScyllaNode:
return self.cluster.scylla_mode
def set_smp(self, smp: int) -> None:
logger.debug(f"Setting smp: {self=} {smp=}")
self._smp_set_during_test = smp
def smp(self) -> int:
logger.debug(f"Getting smp: {self=} _smp_set_during_test={self._smp_set_during_test} _smp={self._smp} {DEFAULT_SMP=}")
return self._smp_set_during_test or self._smp or DEFAULT_SMP
def memory(self) -> int:
return self._memory or self.smp() * DEFAULT_MEMORY_PER_CPU
def _adjust_smp_and_memory(self, smp: int | None = None, memory: int | None = None) -> None:
if not memory and not smp:
return
logger.debug(f"Adjusting smp={smp} memory={memory} current_smp={self._smp} current_memory={self._memory}")
if memory:
self._memory = memory // (smp or self.smp()) * self.smp()
if smp:
@@ -455,8 +447,6 @@ class ScyllaNode:
self.mark = self.mark_log()
logger.debug(f"Starting server: server_id={self.server_id} {scylla_args=} {scylla_env=}")
self.cluster.manager.server_start(
server_id=self.server_id,
seeds=None if self.bootstrap else [self.address()],
@@ -476,6 +466,9 @@ class ScyllaNode:
if wait_for_binary_proto:
self.wait_for_binary_interface(from_mark=self.mark)
if not self._hostid:
self.hostid()
if wait_other_notice:
timeout = self.cluster.default_wait_other_notice_timeout
for node, mark in marks:
@@ -658,11 +651,12 @@ class ScyllaNode:
cmd.append(table)
self.nodetool(" ".join(cmd), **kwargs)
def compact(self, keyspace: str = "", tables: str | None = ()) -> None:
def compact(self, keyspace: str = "", tables: list[str] | None = None) -> None:
compact_cmd = ["compact"]
if keyspace:
compact_cmd.append(keyspace)
compact_cmd += tables
if tables:
compact_cmd.extend(tables)
self.nodetool(" ".join(compact_cmd))
def drain(self, block_on_log: bool = False) -> None:
@@ -835,10 +829,13 @@ class ScyllaNode:
assert timeout is None, "argument `timeout` is not supported" # not used in scylla-dtest
assert force_refresh is None, "argument `force_refresh` is not supported" # not used in scylla-dtest
try:
return self.cluster.manager.get_host_id(server_id=self.server_id)
except Exception as exc:
self.error(f"Failed to get hostid: {exc}")
if not self._hostid:
try:
self._hostid = self.cluster.manager.get_host_id(server_id=self.server_id)
except Exception as exc:
self.error(f"Failed to get hostid: {exc}")
return self._hostid
def rmtree(self, path: str | Path) -> None:
"""Delete a directory content without removing the directory.

View File

@@ -34,6 +34,7 @@ def pytest_addoption(parser: Parser) -> None:
parser.addoption("--experimental-features", type=lambda s: s.split(","), action="store", help="Pass experimental features <feature>,<feature> to enable", default=None)
parser.addoption("--tablets", action=argparse.BooleanOptionalAction, default=False, help="Whether to enable tablets support (default: %(default)s)")
parser.addoption("--force-gossip-topology-changes", action="store_true", default=False, help="force gossip topology changes in a fresh cluster")
parser.addoption("--compaction-strategy", action="store", default=None, help="Compaction strategy to use in tests that support it (e.g. wide_rows_test.py). One of LeveledCompactionStrategy, SizeTieredCompactionStrategy, TimeWindowCompactionStrategy, or IncrementalCompactionStrategy. If not set, a random strategy is chosen per test.")
def pytest_configure(config: Config) -> None:

View File

@@ -1,46 +0,0 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
#
import logging
import pytest
from dtest_class import Tester
logger = logging.getLogger(__file__)
@pytest.mark.single_node
class TestSetSmp(Tester):
"""Test that node.set_smp() properly persists across restarts."""
def _get_smp_from_log(self, node, from_mark=None):
"""Extract smp value from the node's log by looking at the SHARD_COUNT gossip value."""
matches = node.grep_log(r"SHARD_COUNT : Value\((\d+),\d+\)", from_mark=from_mark)
assert matches, "Could not find SHARD_COUNT in node log"
# Return the last match (most recent start)
return int(matches[-1][1].group(1))
def test_set_smp(self):
"""Verify that set_smp() takes effect on the next start."""
cluster = self.cluster
cluster.populate(1).start(wait_for_binary_proto=True)
node1 = cluster.nodelist()[0]
default_smp = self._get_smp_from_log(node1)
cluster.stop()
# set_smp to a different value and restart without jvm_args
target_smp = 1 if default_smp != 1 else 2
node1.set_smp(target_smp)
mark = node1.mark_log()
cluster.start(wait_for_binary_proto=True)
node1 = cluster.nodelist()[0]
actual_smp = self._get_smp_from_log(node1, from_mark=mark)
assert actual_smp == target_smp, \
f"Expected smp={target_smp} after set_smp({target_smp}), got {actual_smp}"

View File

@@ -263,3 +263,25 @@ def assert_lists_equal_ignoring_order(list1, list2, sort_key=None):
sorted_list2 = sorted(normalized_list2, key=lambda elm: str(elm[sort_key]))
assert sorted_list1 == sorted_list2
def assert_equal_more_with_deviation(actual, expect, deviation_perc):
"""
Assert actual is within inclusive interval [expected...expected+deviation_perc]
@param actual Value inspected
@param expect Beginning of expected interval
@param deviation_perc allowed percent increase
"""
deviation_high = (expect * (100 + deviation_perc)) / 100
assert expect <= actual <= deviation_high, f"Expect result interval {expect}..{deviation_high}, received {actual}"
def assert_less_equal_lists(actual_list, expected_list, msg=None):
"""
Assert actual_list is a subset of the expected list, prints hardcoded or parameterized error message
@param actual_list Inspected list
@param expected_list List that supposed to include actual_list
@param msg Configured message default None.
"""
standardMsg = msg or f"{actual_list} not less than or equal to {expected_list}"
assert set(actual_list) <= set(expected_list), standardMsg

File diff suppressed because it is too large Load Diff

View File

@@ -29,16 +29,12 @@ import pytest
from cassandra import AlreadyExists, AuthenticationFailed, ConsistencyLevel, InvalidRequest, Unauthorized, Unavailable, WriteFailure
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import NoHostAvailable, Session, EXEC_PROFILE_DEFAULT
from cassandra.connection import UnixSocketEndPoint
from cassandra.policies import WhiteListRoundRobinPolicy
from cassandra.query import BatchStatement, BatchType, SimpleStatement, named_tuple_factory
from test.cluster.conftest import cluster_con
from test.cluster.dtest.dtest_class import create_ks, wait_for
from test.cluster.dtest.tools.assertions import assert_invalid
from test.cluster.dtest.tools.data import rows_to_list, run_in_parallel
from test.pylib.driver_utils import safe_driver_shutdown
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import read_barrier
from test.pylib.skip_types import skip_env
@@ -277,7 +273,6 @@ class AuditEntry:
statement: str
table: str
user: str
source: str = "127.0.0.1"
class AuditBackend:
@@ -454,13 +449,6 @@ class AuditBackendSyslog(AuditBackend):
entries.append(self.line_to_row(line, idx))
return { self.audit_mode(): entries }
@staticmethod
def _parse_address(addr_port):
"""Extract IP from 'ip:port' (IPv4) or '[ip]:port' (IPv6)."""
if addr_port.startswith("["):
return addr_port[1:addr_port.index("]")]
return addr_port.split(":")[0]
def line_to_row(self, line, idx):
metadata, data = line.split(": ", 1)
data = "".join(data.splitlines()) # Remove newlines
@@ -472,9 +460,9 @@ class AuditBackendSyslog(AuditBackend):
# and make sure it doesn't change during the test (e.g. when the test is running at 23:59:59)
date = datetime.datetime(2000, 1, 1, 0, 0)
node = self._parse_address(match.group("node"))
node = match.group("node").split(":")[0]
statement = match.group("query").replace("\\", "")
source = self._parse_address(match.group("client_ip"))
source = match.group("client_ip").split(":")[0]
event_time = uuid.UUID(int=idx)
t = self.named_tuple_factory(date, node, event_time, match.group("category"), match.group("cl"), match.group("error") == "true", match.group("keyspace"), statement, source, match.group("table"), match.group("username"))
return t
@@ -594,7 +582,6 @@ class CQLAuditTester(AuditTester):
user="anonymous",
cl="ONE",
error=False,
source="127.0.0.1",
):
self.assert_audit_row_fields(row)
assert row.node in self.server_addresses
@@ -603,7 +590,7 @@ class CQLAuditTester(AuditTester):
assert row.error == error
assert row.keyspace_name == ks
assert row.operation == statement
assert row.source == source
assert row.source == "127.0.0.1"
assert row.table_name == table
assert row.username == user
@@ -827,7 +814,7 @@ class CQLAuditTester(AuditTester):
sorted_new_rows = sorted(new_rows, key=lambda row: (row.node, row.category, row.consistency, row.error, row.keyspace_name, row.operation, row.source, row.table_name, row.username))
assert len(sorted_new_rows) == len(expected_entries)
for row, entry in zip(sorted_new_rows, sorted(expected_entries)):
self.assert_audit_row_eq(row, entry.category, entry.statement, entry.table, entry.ks, entry.user, entry.cl, entry.error, entry.source)
self.assert_audit_row_eq(row, entry.category, entry.statement, entry.table, entry.ks, entry.user, entry.cl, entry.error)
async def verify_keyspace(self, audit_settings=None, helper=None):
"""
@@ -1867,44 +1854,6 @@ class CQLAuditTester(AuditTester):
finally:
session.execute("DROP KEYSPACE IF EXISTS kss")
# Unix domain sockets have no IP peer address. Seastar's
# socket_address::addr() falls through to the default case for
# AF_UNIX and returns a zero-initialised in6_addr, i.e. "::".
MAINTENANCE_SOCKET_SOURCE = "::"
async def _test_audit_maintenance_socket_user_creation(self, manager, helper_class):
with helper_class() as helper:
session = await self.prepare(
user="cassandra", password="cassandra",
helper=helper,
audit_settings={**helper.audit_default_settings, "audit_categories": "DCL", "audit_keyspaces": ""},
create_keyspace=False,
)
servers = await manager.running_servers()
server = servers[0]
socket_path = await manager.server_get_maintenance_socket_path(server.server_id)
logger.info("Connecting to maintenance socket")
endpoint = UnixSocketEndPoint(socket_path)
maint_cluster = cluster_con([endpoint],
load_balancing_policy=WhiteListRoundRobinPolicy([endpoint]))
maint_session = maint_cluster.connect()
role_name = "audit_test_admin"
create_stmt = f"CREATE ROLE {role_name} WITH PASSWORD = 'secret' AND SUPERUSER = true AND LOGIN = true"
expected_operation = f"CREATE ROLE {role_name} WITH PASSWORD = '***' AND SUPERUSER = true AND LOGIN = true"
logger.info("Creating superuser via maintenance socket and verifying audit entry")
expected_entries = [AuditEntry(category="DCL", statement=expected_operation,
user="anonymous", table="", ks="", cl="LOCAL_QUORUM", error=False,
source=self.MAINTENANCE_SOCKET_SOURCE)]
with self.assert_entries_were_added(session, expected_entries):
maint_session.execute(create_stmt)
logger.info("Cleaning up created role")
maint_session.execute(f"DROP ROLE IF EXISTS {role_name}")
safe_driver_shutdown(maint_cluster)
# AuditBackendTable, no auth, rf=1
@@ -1997,14 +1946,6 @@ async def test_service_level_statements_standalone(manager: ManagerClient):
await CQLAuditTester(manager)._test_service_level_statements()
async def test_audit_maintenance_socket_user_creation(manager: ManagerClient):
"""Verify that creating a superuser via the maintenance socket is audited."""
t = CQLAuditTester(manager)
await t._test_audit_maintenance_socket_user_creation(manager, AuditBackendTable)
Syslog = functools.partial(AuditBackendSyslog, socket_path=syslog_socket_path)
await t._test_audit_maintenance_socket_user_creation(manager, Syslog)
# AuditBackendSyslog, no auth, rf=1
async def test_audit_syslog_noauth(manager: ManagerClient):

View File

@@ -48,5 +48,6 @@ run_in_dev:
- dtest/commitlog_test
- dtest/cfid_test
- dtest/rebuild_test
- dtest/wide_rows_test
run_in_debug:
- random_failures/test_random_failures

View File

@@ -8,7 +8,7 @@ from test.pylib.manager_client import ManagerClient
from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair
from test.pylib.tablets import get_all_tablet_replicas
from test.cluster.tasks.task_manager_client import TaskManagerClient
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, ensure_group0_leader_on, new_test_keyspace, new_test_table, trigger_stepdown
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown
from test.pylib.util import wait_for_cql_and_get_hosts
from cassandra.query import ConsistencyLevel, SimpleStatement
@@ -880,30 +880,41 @@ async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manage
# affected replica but process the UNREPAIRED sstable on the others, so the classification
# divergence is never corrected. In tombstone scenarios this enables premature tombstone GC
# on the affected replica leading to data resurrection.
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
cmdline = ['--hinted-handoff-enabled', '0']
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
class _LeadershipTransferred(Exception):
"""Raised when leadership transferred to servers[1] during the test, requiring a retry."""
pass
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
await cql.run_async(
f"ALTER TABLE {ks}.test WITH compaction = "
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
)
async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, token, scylla_path, current_key):
"""Core logic for test_incremental_repair_race_window_promotes_unrepaired_data.
# Disable autocompaction everywhere so we control exactly when compaction runs.
for s in servers:
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
scylla_path = await manager.server_get_exe(servers[0].server_id)
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
# S0'(repaired_at=1) on all nodes.
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# Insert keys 10-19 and flush on all nodes → S1(repaired_at=0).
# These will be the subject of repair 2.
repair2_keys = list(range(current_key, current_key + 10))
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
for s in servers:
await manager.api.flush_keyspace(s.ip_addr, ks)
current_key += 10
Returns the next current_key value.
Raises _LeadershipTransferred if servers[1] becomes coordinator after the
restart, signalling the caller to retry.
"""
# Ensure servers[1] is not the topology coordinator. If the coordinator is
# restarted, the Raft leader dies, a new election occurs, and the new
# coordinator re-initiates tablet repair -- flushing memtables on all replicas
# and marking post-repair data as repaired. That legitimate re-repair masks
# the compaction-merge bug this test detects.
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)
if coord_serv == servers[1]:
other = next(s for s in servers if s != servers[1])
await ensure_group0_leader_on(manager, other)
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)
coord_log = await manager.server_open_log(coord_serv.server_id)
coord_mark = await coord_log.mark()
@@ -967,16 +978,6 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
await manager.server_start(target.server_id)
await manager.servers_see_each_other(servers)
# Check if leadership transferred to servers[1] during the restart.
# If so, the new coordinator will re-initiate repair, masking the bug.
new_coord = await get_topology_coordinator(manager)
new_coord_serv = await find_server_by_host_id(manager, servers, new_coord)
if new_coord_serv == servers[1]:
await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update")
await manager.api.wait_task(servers[0].ip_addr, task_id)
raise _LeadershipTransferred(
"servers[1] became topology coordinator after restart")
# Poll until compaction has produced F(repaired_at=2) containing post-repair keys,
# confirming that the bug was triggered (S1' and E merged during the race window).
deadline = time.time() + 60
@@ -999,7 +1000,7 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
if not compaction_ran:
logger.warning("Compaction did not merge S1' and E after restart during the race window; "
"the bug was not triggered. Skipping assertion.")
return current_key
return
# Flush servers[0] and servers[2] AFTER the race window closes so their post-repair
# keys land in G(repaired_at=0): correctly classified as UNREPAIRED.
@@ -1030,9 +1031,8 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, "
f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}")
# servers[0] and servers[2] were never restarted and the coordinator stayed
# alive throughout, so no re-repair could have flushed their memtables.
# Post-repair keys must NOT appear in repaired sstables on these servers.
# servers[0] and servers[2] flushed post-repair keys after the race window closed,
# so those keys are in G(repaired_at=0) → correctly UNREPAIRED.
assert not (repaired_keys_0 & post_repair_key_set), \
f"servers[0] should not have post-repair keys in repaired sstables, " \
f"got: {repaired_keys_0 & post_repair_key_set}"
@@ -1053,54 +1053,6 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
f"on servers[1] after restart lost the being_repaired markers during the race window. " \
f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \
f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"
return current_key
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
cmdline = ['--hinted-handoff-enabled', '0']
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
await cql.run_async(
f"ALTER TABLE {ks}.test WITH compaction = "
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
)
# Disable autocompaction everywhere so we control exactly when compaction runs.
for s in servers:
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
scylla_path = await manager.server_get_exe(servers[0].server_id)
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
# S0'(repaired_at=1) on all nodes.
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# Insert keys 10-19 and flush on all nodes -> S1(repaired_at=0).
# These will be the subject of repair 2.
repair2_keys = list(range(current_key, current_key + 10))
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
for s in servers:
await manager.api.flush_keyspace(s.ip_addr, ks)
current_key += 10
# If leadership transfers to servers[1] between our coordinator check and the
# restart, the coordinator change masks the bug. Detect and retry.
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
current_key = await _do_race_window_promotes_unrepaired_data(
manager, servers, cql, ks, token, scylla_path, current_key)
return
except _LeadershipTransferred as e:
logger.warning(f"Attempt {attempt}/{max_attempts}: {e}. Retrying.")
pytest.fail(f"Leadership kept transferring to servers[1] after {max_attempts} attempts; "
"could not run the test without coordinator interference.")
# ----------------------------------------------------------------------------
# Tombstone GC safety tests

View File

@@ -4,7 +4,7 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
#
from test.pylib.manager_client import ManagerClient
from test.cluster.util import get_topology_coordinator, trigger_stepdown, new_test_keyspace, new_test_table
from test.cluster.util import get_topology_coordinator, trigger_stepdown
import pytest
import logging
@@ -83,78 +83,3 @@ async def test_load_stats_on_coordinator_failover(manager: ManagerClient):
coord3 = await get_topology_coordinator(manager)
if coord3:
break
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_load_stats_refresh_during_shutdown(manager: ManagerClient):
"""Verify that _tablet_load_stats_refresh is properly joined during
topology coordinator shutdown, even when a schema change notification
triggers a refresh between run() completing and stop() being called.
Reproduces the scenario using two injection points:
- topology_coordinator_pause_before_stop: pauses after run() finishes
but before stop() is called
- refresh_tablet_load_stats_pause: holds refresh_tablet_load_stats()
so it's still in-flight during shutdown
Without the join in stop(), the refresh task outlives the coordinator
and accesses freed memory.
"""
servers = await manager.servers_add(3)
await manager.get_ready_cql(servers)
async with new_test_keyspace(manager,
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
coord = await get_topology_coordinator(manager)
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
coord_idx = host_ids.index(coord)
coord_server = servers[coord_idx]
log = await manager.server_open_log(coord_server.server_id)
mark = await log.mark()
# Injection B: pause between run() returning and stop() being called.
await manager.api.enable_injection(
coord_server.ip_addr, "topology_coordinator_pause_before_stop", one_shot=True)
# Stepdown causes the topology coordinator to abort and shut down.
logger.info("Triggering stepdown on coordinator")
await trigger_stepdown(manager, coord_server)
# Wait for injection B to fire. The coordinator has finished run() but
# the schema change listener is still registered.
mark, _ = await log.wait_for(
"topology_coordinator_pause_before_stop: waiting", from_mark=mark)
# Injection A: block refresh_tablet_load_stats() before it accesses _shared_tm.
# Enable it now so it only catches the notification-triggered call.
await manager.api.enable_injection(
coord_server.ip_addr, "refresh_tablet_load_stats_pause", one_shot=True)
# CREATE TABLE fires on_create_column_family on the old coordinator which
# fire-and-forgets _tablet_load_stats_refresh.trigger() scheduling a task
# via with_scheduling_group on the gossip scheduling group.
logger.info("Issuing CREATE TABLE while coordinator is paused before stop()")
async with new_test_table(manager, ks, "pk int PRIMARY KEY", reuse_tables=False):
# Wait for injection A: refresh_tablet_load_stats() is now blocked before
# accessing _shared_tm. The topology_coordinator is still alive (paused at B).
await log.wait_for("refresh_tablet_load_stats_pause: waiting", from_mark=mark)
# Release injection B: coordinator proceeds through stop().
# Without the fix, stop() returns quickly and run_topology_coordinator
# frees the topology_coordinator frame. With the fix, stop() blocks at
# _tablet_load_stats_refresh.join() until injection A is released.
logger.info("Releasing injection B: coordinator will stop")
await manager.api.message_injection(
coord_server.ip_addr, "topology_coordinator_pause_before_stop")
# Release injection A: refresh_tablet_load_stats() resumes and accesses
# this->_shared_tm via get_token_metadata_ptr(). Without the fix, 'this'
# points to freed memory and ASan detects heap-use-after-free.
logger.info("Releasing injection A: refresh resumes")
await manager.api.message_injection(
coord_server.ip_addr, "refresh_tablet_load_stats_pause")
# If the bug is present, the node crashed. read_barrier will fail.
await read_barrier(manager.api, coord_server.ip_addr)

View File

@@ -435,9 +435,8 @@ async def test_alter_tablets_rf_dc_drop(request: pytest.FixtureRequest, manager:
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str, host, ip_addr):
await read_barrier(manager.api, ip_addr)
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
async def get_replication_options(ks: str):
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl
@@ -452,44 +451,43 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
cql = manager.get_cql()
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
repl = await get_replication_options("ks1")
assert repl['dc1'] == '1'
await cql.run_async("create keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 2} and tablets = {'initial': 4};")
await cql.run_async("create table ks2.t (pk int primary key);")
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
repl = await get_replication_options("ks2")
assert repl['dc1'] == '1'
assert repl['dc2'] == '2'
await cql.run_async("create keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
await cql.run_async("create table ks3.t (pk int primary key);")
repl = await get_replication_options("ks3", host, servers[0].ip_addr)
repl = await get_replication_options("ks3")
assert repl['dc1'] == '1'
await cql.run_async("create keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
await cql.run_async("create table ks4.t (pk int primary key);")
repl = await get_replication_options("ks4", host, servers[0].ip_addr)
repl = await get_replication_options("ks4")
assert repl['dc1'] == '1'
await cql.run_async(f"create keyspace ks5 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks5.t (pk int primary key);")
repl = await get_replication_options("ks5", host, servers[0].ip_addr)
repl = await get_replication_options("ks5")
assert repl['dc1'] == '2'
assert repl['dc2'] == '2'
await cql.run_async(f"create keyspace ks6 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks6.t (pk int primary key);")
repl = await get_replication_options("ks6", host, servers[0].ip_addr)
repl = await get_replication_options("ks6")
assert repl['dc1'] == '2'
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
repl = await get_replication_options("ks1")
assert repl['dc1'] == ['rack1b']
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks1", "t")
@@ -499,7 +497,7 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
assert r.replicas[0][0] == host_ids[1]
await cql.run_async("alter keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a', 'rack2b']};")
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
repl = await get_replication_options("ks2")
assert repl['dc1'] == ['rack1a']
assert len(repl['dc2']) == 2
assert 'rack2a' in repl['dc2'] and 'rack2b' in repl['dc2']
@@ -525,13 +523,13 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
pass
await cql.run_async("alter keyspace ks5 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b'], 'dc2' : 2};")
repl = await get_replication_options("ks5", host, servers[0].ip_addr)
repl = await get_replication_options("ks5")
assert len(repl['dc1']) == 2
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
assert repl['dc2'] == '2'
await cql.run_async("alter keyspace ks6 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : 2, 'dc2' : ['rack2a']};")
repl = await get_replication_options("ks6", host, servers[0].ip_addr)
repl = await get_replication_options("ks6")
assert repl['dc1'] == '2'
assert len(repl['dc2']) == 1
assert repl['dc2'][0] == 'rack2a'
@@ -539,9 +537,8 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str, host, ip_addr):
await read_barrier(manager.api, ip_addr)
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
async def get_replication_options(ks: str):
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl
@@ -554,11 +551,10 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc2', 'rack': 'rack2b'})]
cql = manager.get_cql()
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
repl = await get_replication_options("ks1")
assert repl['dc1'] == '1'
await cql.run_async("CREATE KEYSPACE ksv WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2} AND tablets = {'enabled': false}")
@@ -578,19 +574,19 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
servers = servers[0:-1]
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
repl = await get_replication_options("ks1")
assert repl['dc1'] == ['rack1b']
logging.info("Rolling restart")
await manager.rolling_restart(servers, wait_for_cql=True, cmdline_options_override=["--enforce-rack-list", "true", "--error-injections-at-startup", "[]", "--smp", "2"])
await cql.run_async(f"create keyspace ks2 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
repl = await get_replication_options("ks2")
assert len(repl['dc1']) == 2
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
await cql.run_async(f"create keyspace ks3 with replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': 4}};")
repl = await get_replication_options("ks3", host, servers[0].ip_addr)
repl = await get_replication_options("ks3")
assert len(repl['dc1']) == 1
assert len(repl['dc2']) == 1
assert 'rack1a' in repl['dc1'] or 'rack1b' in repl['dc1']
@@ -606,7 +602,7 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
assert failed
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b'], 'dc2': 1};")
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
repl = await get_replication_options("ks1")
assert len(repl['dc1']) == 1
assert repl['dc1'][0] == 'rack1b'
assert len(repl['dc2']) == 1
@@ -1113,9 +1109,8 @@ async def test_multi_rf_increase_before_decrease_0_N(request: pytest.FixtureRequ
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str, host, ip_addr):
await read_barrier(manager.api, ip_addr)
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
async def get_replication_options(ks: str):
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl
@@ -1133,11 +1128,10 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
cql = manager.get_cql()
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
repl = await get_replication_options("ks1")
assert repl['dc1'] == '1'
[await manager.api.disable_injection(s.ip_addr, numeric_injection) for s in servers]
@@ -1171,7 +1165,7 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
failed = True
assert failed
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
repl = await get_replication_options("ks1")
assert repl['dc1'] == '1'
@pytest.mark.asyncio

View File

@@ -18,7 +18,6 @@
#include <seastar/testing/test_case.hh>
#include "test/lib/exception_utils.hh"
#include "test/lib/log.hh"
#include "test/lib/test_utils.hh"
#include "ldap_common.hh"
#include "service/migration_manager.hh"
@@ -682,41 +681,3 @@ SEASTAR_TEST_CASE(ldap_config) {
},
make_ldap_config());
}
// Reproduces the race between the cache pruner and the permission
// loader lifecycle during shutdown. Refs SCYLLADB-1679.
SEASTAR_TEST_CASE(ldap_pruner_no_crash_after_loader_cleared) {
auto cfg = make_ldap_config();
cfg->permissions_update_interval_in_ms.set(1);
auto call_count = seastar::make_lw_shared<int>(0);
co_await do_with_cql_env_thread([call_count](cql_test_env& env) {
auto& cache = env.auth_cache().local();
testlog.info("Populating 50 cache entries");
for (int i = 0; i < 50; i++) {
auto r = auth::make_data_resource("system", fmt::format("t{}", i));
cache.get_permissions(auth::role_or_anonymous(), r).get();
}
testlog.info("Installing slow permission loader (10ms per call)");
cache.set_permission_loader(
[call_count] (const auth::role_or_anonymous&, const auth::resource&)
-> seastar::future<auth::permission_set> {
++(*call_count);
co_await seastar::sleep(std::chrono::milliseconds(10));
co_return auth::permission_set();
});
testlog.info("Waiting for pruner to start reloading");
while (*call_count == 0) {
seastar::sleep(std::chrono::milliseconds(1)).get();
}
testlog.info("Pruner started, letting teardown run");
}, cfg);
testlog.info("Loader called {} times", *call_count);
}

View File

@@ -371,13 +371,9 @@ int scylla_simple_query_main(int argc, char** argv) {
audit::audit::start_audit(env.local_db().get_config(), env.get_shared_token_metadata(), env.qp(), env.migration_manager()).handle_exception([&] (auto&& e) {
fmt::print("audit start failed: {}", e);
}).get();
audit::audit::start_storage(env.local_db().get_config()).get();
auto audit_stop = defer([] {
audit::audit::stop_audit().get();
});
auto audit_storage_stop = defer([] {
audit::audit::stop_storage().get();
});
auto results = do_cql_test(env, cfg);
aggregated_perf_results agg(results);
std::cout << agg << std::endl;

View File

@@ -72,12 +72,16 @@ class CppFile(pytest.File, ABC):
self.test_name = self.path.stem
# Implement following properties as cached_property because they are read-only, and based on stash items which
# will be assigned in test/pylib/runner.py::pytest_collect_file() and modify_pytest_item() after instance creation.
# will be assigned in test/pylib/runner.py::pytest_collect_file() hook after a CppFile instance was created.
@cached_property
def build_mode(self) -> str:
return self.stash[BUILD_MODE]
@cached_property
def run_id(self) -> int:
return self.stash[RUN_ID]
@cached_property
def suite_config(self) -> dict[str, Any]:
return self.stash[TEST_SUITE].cfg
@@ -162,13 +166,9 @@ class CppTestCase(pytest.Item):
self.own_markers = [getattr(pytest.mark, mark_name) for mark_name in own_markers]
self.add_marker(pytest.mark.cpp)
@cached_property
def run_id(self) -> int:
return self.stash[RUN_ID]
def get_artifact_path(self, extra: str = "", suffix: str = "") -> pathlib.Path:
return self.parent.log_dir / ".".join(
(self.path.relative_to(TEST_DIR).with_suffix("") / f"{self.name}{extra}.{self.run_id}{suffix}").parts
(self.path.relative_to(TEST_DIR).with_suffix("") / f"{self.name}{extra}.{self.parent.run_id}{suffix}").parts
)
def make_testpy_test_object_mock(self) -> SimpleNamespace:
@@ -179,7 +179,7 @@ class CppTestCase(pytest.Item):
return SimpleNamespace(
time_end=0,
time_start=0,
id=self.run_id,
id=self.parent.run_id,
mode=self.parent.build_mode,
success=False,
shortname=self.name,

View File

@@ -15,7 +15,7 @@ import random
import sys
from argparse import BooleanOptionalAction
from collections import defaultdict
from itertools import chain, count
from itertools import chain, count, product
from functools import cache, cached_property
from pathlib import Path
from random import randint
@@ -106,7 +106,7 @@ def pytest_addoption(parser: pytest.Parser) -> None:
" '--logger-log-level raft=trace --default-log-level error'")
parser.addoption('--x-log2-compaction-groups', action="store", default="0", type=int,
help="Controls number of compaction groups to be used by Scylla tests. Value of 3 implies 8 groups.")
parser.addoption('--repeat', action="store", default=1, type=int,
parser.addoption('--repeat', action="store", default="1", type=int,
help="number of times to repeat test execution")
# Pass information about Scylla node from test.py to pytest.
@@ -162,10 +162,9 @@ def scylla_binary(testpy_test) -> Path:
return testpy_test.suite.scylla_exe
def pytest_collection_modifyitems(items: list[pytest.Item], config: pytest.Config) -> None:
run_ids = defaultdict(lambda: count(start=int(config.getoption("--run_id") or 1)))
def pytest_collection_modifyitems(items: list[pytest.Item]) -> None:
for item in items:
modify_pytest_item(item=item, run_ids=run_ids)
modify_pytest_item(item=item)
suites_order = defaultdict(count().__next__) # number suites in order of appearance
@@ -286,10 +285,7 @@ def pytest_configure(config: pytest.Config) -> None:
pytest_log_dir.mkdir(parents=True, exist_ok=True)
if not _pytest_config.getoption("--save-log-on-success"):
for file in pytest_log_dir.glob("*"):
# This will help in case framework tests are executed with test.py event if it's the wrong way to run them.
# test_no_bare_skip_markers_in_collection uses a subprocess to run a collection that has lead to race
# condition, especially with repeat.
file.unlink(missing_ok=True)
file.unlink()
_pytest_config.stash[PYTEST_LOG_FILE] = f"{pytest_log_dir}/pytest_main_{HOST_ID}.log"
@@ -317,15 +313,14 @@ def pytest_configure(config: pytest.Config) -> None:
os.environ["TOPOLOGY_RANDOM_FAILURES_TEST_SHUFFLE_SEED"] = os.environ.get("TOPOLOGY_RANDOM_FAILURES_TEST_SHUFFLE_SEED", str(random.randint(0, sys.maxsize)))
config.build_modes = get_modes_to_run(config)
repeat = int(config.getoption("--repeat"))
if testpy_run_id := config.getoption("--run_id"):
if config.getoption("--repeat") != 1:
if repeat != 1:
raise RuntimeError("Can't use --run_id and --repeat simultaneously.")
class DisabledFile(pytest.File):
def collect(self) -> list[pytest.Item]:
pytest.skip("All tests in this file are disabled in requested modes according to the suite config.")
config.run_ids = (testpy_run_id,)
else:
config.run_ids = tuple(range(1, repeat + 1))
@pytest.hookimpl(wrapper=True)
@@ -342,16 +337,19 @@ def pytest_collect_file(file_path: pathlib.Path,
mode for mode in build_modes
if not suite_config.is_test_disabled(build_mode=mode, path=file_path)
)
if repeats := [mode for mode in build_modes for _ in range(parent.config.getoption("--repeat"))]:
ihook = parent.ihook
collectors = list(chain(collectors, chain.from_iterable(
ihook.pytest_collect_file(file_path=file_path, parent=parent) for _ in range(1, len(repeats))
)))
for build_mode, collector in zip(repeats, collectors, strict=True):
collector.stash[BUILD_MODE] = build_mode
collector.stash[TEST_SUITE] = suite_config
else:
collectors = [DisabledFile.from_parent(parent=parent, path=file_path)]
repeats = list(product(build_modes, parent.config.run_ids))
if not repeats:
return []
ihook = parent.ihook
collectors = list(chain(collectors, chain.from_iterable(
ihook.pytest_collect_file(file_path=file_path, parent=parent) for _ in range(1, len(repeats))
)))
for (build_mode, run_id), collector in zip(repeats, collectors, strict=True):
collector.stash[BUILD_MODE] = build_mode
collector.stash[RUN_ID] = run_id
collector.stash[TEST_SUITE] = suite_config
parent.stash[REPEATING_FILES].remove(file_path)
@@ -425,7 +423,7 @@ class TestSuiteConfig:
TEST_SUITE = pytest.StashKey[TestSuiteConfig | None]()
_STASH_KEYS_TO_COPY = BUILD_MODE, TEST_SUITE
_STASH_KEYS_TO_COPY = BUILD_MODE, RUN_ID, TEST_SUITE
def get_params_stash(node: _pytest.nodes.Node) -> pytest.Stash | None:
@@ -435,12 +433,11 @@ def get_params_stash(node: _pytest.nodes.Node) -> pytest.Stash | None:
return parent.stash
def modify_pytest_item(item: pytest.Item, run_ids: defaultdict[tuple[str, str], count]) -> None:
def modify_pytest_item(item: pytest.Item) -> None:
params_stash = get_params_stash(node=item)
for key in _STASH_KEYS_TO_COPY:
item.stash[key] = params_stash[key]
item.stash[RUN_ID] = next(run_ids[(item.stash[BUILD_MODE], item._nodeid)])
suffix = f".{item.stash[BUILD_MODE]}.{item.stash[RUN_ID]}"

View File

@@ -75,7 +75,6 @@ def test_no_bare_skip_markers_in_collection():
"--collect-only",
"--ignore=boost", "--ignore=raft",
"--ignore=ldap", "--ignore=vector_search",
"--ignore=unit",
"-p", "no:sugar"],
capture_output=True, text=True,
cwd=str(_TEST_ROOT),