Compare commits

..

13 Commits

Author SHA1 Message Date
Benny Halevy
211eb7c32b test/cluster/dtest: wide_rows_test.py: use prepared statements
Replace string-formatted CQL queries in loops with prepared
statements and bind parameters. This avoids repeated query parsing
on the server side and eliminates CQL injection risk from string
interpolation.

Functions converted:
- test_column_index_stress: INSERT (100k iterations) and SELECT (10k)
- create_large_partition_data: UPDATE with TIMESTAMP
- create_large_row_data: UPDATE per column
- create_too_many_rows_data: UPDATE for columns and collections
- delete_too_many_rows_data: DELETE for columns and collections
- create_large_row_static_data: INSERT
- set_ttl_on_few_rows_in_partition: SELECT and UPDATE with TTL
- set_ttl_on_few_large_rows: SELECT and UPDATE with TTL
2026-04-28 11:40:16 +03:00
Benny Halevy
3a640a9ff0 test/cluster/dtest: wide_rows_test.py: randomize compaction strategy
Replace parametrized compaction strategy (4 strategies × 31 tests =
124 test cases) with random selection per test. This reduces the
test count to 31 while still covering all strategies over time.

Add --compaction-strategy option to allow reproducing failures with
a specific strategy, e.g.:
  ./test.py --mode=dev test/cluster/dtest/wide_rows_test.py \
    --pytest-arg="--compaction-strategy=LeveledCompactionStrategy"
2026-04-28 11:40:16 +03:00
Benny Halevy
8288f87beb test/cluster/dtest: wide_rows_test.py: reduce TTL sleep time
Reduce TTL from 60 to 1 second and sleep time from ttl+5 to ttl+1
in set_ttl_on_few_rows_in_partition() and set_ttl_on_few_large_rows().
The original 60-second TTL was unnecessarily high, adding over a
minute of idle wait time per TTL test invocation.
2026-04-28 11:40:16 +03:00
Benny Halevy
77c03354f4 test/cluster/dtest: wide_rows_test.py: fix key_appearance accumulation
In validate_entities_recognized_as_large(), key_appearance was
overwritten on each loop iteration instead of being accumulated.
This meant that for entity_type == "cell" in multi-node clusters,
entities_count only reflected the last node's count rather than
the total across all nodes. Fix by using += to accumulate.

Update expected_entity_number in test_large_cell_in_materialized_view
to account for RF=3 replication (each cell appears on all 3 nodes).

Bug inherited from scylla-dtest.
2026-04-28 11:40:16 +03:00
Benny Halevy
49df9242f7 test/cluster/dtest: wide_rows_test.py: scope compact to test keyspace/table
Pass KEYSPACE_NAME and TABLE_NAME to cluster.compact() instead of
compacting all keyspaces. This avoids unnecessary compaction of
system tables, making tests faster.

Also convert remaining nodetool("compact ...") calls to use
cluster.compact() for consistency.
2026-04-28 11:40:16 +03:00
Benny Halevy
8c82c6646b test/cluster/dtest: wide_rows_test.py: fix expect_warning mutation across nodes
In validate_log_warnings(), expect_warning was reassigned inside the
per-node loop, so if the first node set it to False (due to no
sstables on disk), all subsequent nodes would inherit that value
regardless of their own state.

Use a local variable (node_expect_warning) instead of mutating the
function parameter.
2026-04-28 11:40:16 +03:00
Benny Halevy
d76d0b8a16 test/cluster/dtest: wide_rows_test.py: remove dead code
Remove validation_small_entity() and get_large_entity_info() methods.
These are not called by any test in the migrated file.
get_large_entity_info() also had a bug where the CQL query used
escaped braces ({{keyspace_name}}) instead of actual parameter
substitution, so it would have queried for literal '{keyspace_name}'.
2026-04-28 11:40:16 +03:00
Benny Halevy
690672e4cb test/cluster/dtest: wide_rows_test.py: cosmetic cleanups
Fix typos: aproximately, quering, colection, table_nam, the the.
Fix grammar: 'verify the they didn't recognized as large'.
Use idiomatic 'not in' instead of 'not x in'.
Remove unused variable assignment and commented-out debug line.
Remove unnecessary f-string prefix.
Fix '/n' to use actual newline in error message formatting.
Fix extra trailing quotes in exception messages.
Remove redundant variable assignment (maximum_primary_key_value).
2026-04-28 11:40:13 +03:00
Benny Halevy
85079d7c7a test/cluster/dtest: migrate wide_rows_test.py from scylla-dtest
Adapt wide_rows_test.py to work with the in-tree cluster test
framework:
- Replace dtest imports with in-tree equivalents
- Replace self.cluster.flush() + self.cluster.wait_for_compactions()
  with self.cluster.compact() since nodetool compact handles flush
  and waiting internally
- Add inline wait_for_view() helper (replaces async version)
- Replace node.status with is_running() check
- Add copyright header

Remove from skip_in_dev now that all tests pass.
2026-04-28 11:39:47 +03:00
Benny Halevy
70f8fcbe67 test/cluster/dtest: cache ScyllaNode hostid
Cache the host ID in ScyllaNode._hostid so that hostid() returns
the cached value when the node is stopped.  Without this,
watch_log_for_death() fails with a timeout because it tries to
query the stopped node's API to get its host ID for the log
pattern match.
2026-04-28 11:36:08 +03:00
Benny Halevy
1d6403ddad test/cluster/dtest: add ScyllaCluster.compact() method
Add compact() method to ScyllaCluster, delegating to
ScyllaNode.compact() on each running node. Accepts optional
keyspace and tables parameters to allow scoping compaction to
specific keyspaces/tables.

Also fix ScyllaNode.compact() to use list[str] for tables
parameter and extend() instead of +=, so that passing a single
table name as a string does not iterate over its characters.
2026-04-28 11:36:08 +03:00
Benny Halevy
9a24be2fe9 test/cluster/dtest: add assertion helpers for wide_rows_test
Add assert_equal_more_with_deviation() and assert_less_equal_lists()
to tools/assertions.py.  These are needed by the wide_rows_test.py
migration from scylla-dtest.
2026-04-28 11:36:08 +03:00
Benny Halevy
5c93ccb6d8 test/cluster/dtest: copy wide_rows_test.py verbatim from scylla-dtest
Copy wide_rows_test.py as-is from scylla-dtest. The test is added
to run_in_dev but also skip_in_dev in test_config.yaml since it
requires functional changes to work with the in-tree test
framework. The next commit will make the necessary changes and
remove it from skip_in_dev.
2026-04-28 11:36:08 +03:00
36 changed files with 1691 additions and 763 deletions

View File

@@ -194,36 +194,22 @@ future<> audit::start_audit(const db::config& cfg, sharded<locator::shared_token
std::move(audited_keyspaces), std::move(audited_keyspaces),
std::move(audited_tables), std::move(audited_tables),
std::move(audited_categories), std::move(audited_categories),
std::cref(cfg)); std::cref(cfg))
} .then([&cfg] {
if (!audit_instance().local_is_initialized()) {
future<> audit::start_storage(const db::config& cfg) { return make_ready_future<>();
if (!audit_instance().local_is_initialized()) { }
return make_ready_future<>(); return audit_instance().invoke_on_all([&cfg] (audit& local_audit) {
} return local_audit.start(cfg);
return audit_instance().invoke_on_all([&cfg] (audit& local_audit) {
return local_audit._storage_helper_ptr->start(cfg).then([&local_audit] {
local_audit._storage_running = true;
}); });
}); });
} }
future<> audit::stop_storage() {
if (!audit_instance().local_is_initialized()) {
return make_ready_future<>();
}
return audit_instance().invoke_on_all([] (audit& local_audit) {
local_audit._storage_running = false;
return local_audit._storage_helper_ptr->stop();
});
}
future<> audit::stop_audit() { future<> audit::stop_audit() {
if (!audit_instance().local_is_initialized()) { if (!audit_instance().local_is_initialized()) {
return make_ready_future<>(); return make_ready_future<>();
} }
return audit::audit::audit_instance().invoke_on_all([] (auto& local_audit) { return audit::audit::audit_instance().invoke_on_all([] (auto& local_audit) {
SCYLLA_ASSERT(!local_audit._storage_running);
return local_audit.shutdown(); return local_audit.shutdown();
}).then([] { }).then([] {
return audit::audit::audit_instance().stop(); return audit::audit::audit_instance().stop();
@@ -237,6 +223,14 @@ audit_info_ptr audit::create_audit_info(statement_category cat, const sstring& k
return std::make_unique<audit_info>(cat, keyspace, table, batch); return std::make_unique<audit_info>(cat, keyspace, table, batch);
} }
future<> audit::start(const db::config& cfg) {
return _storage_helper_ptr->start(cfg);
}
future<> audit::stop() {
return _storage_helper_ptr->stop();
}
future<> audit::shutdown() { future<> audit::shutdown() {
return make_ready_future<>(); return make_ready_future<>();
} }
@@ -247,12 +241,6 @@ future<> audit::log(const audit_info& audit_info, const service::client_state& c
const sstring& username = client_state.user() ? client_state.user()->name.value_or(anonymous_username) : no_username; const sstring& username = client_state.user() ? client_state.user()->name.value_or(anonymous_username) : no_username;
socket_address client_ip = client_state.get_client_address().addr(); socket_address client_ip = client_state.get_client_address().addr();
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr(); socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
if (!_storage_running) {
on_internal_error_noexcept(logger, fmt::format("Audit log dropped (storage not ready): node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}",
node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(),
audit_info.query(), client_ip, audit_info.table(), username));
return make_ready_future<>();
}
if (logger.is_enabled(logging::log_level::debug)) { if (logger.is_enabled(logging::log_level::debug)) {
logger.debug("Log written: node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}", logger.debug("Log written: node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}",
node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(), node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(),
@@ -298,11 +286,6 @@ future<> inspect(const audit_info_alternator& ai, const service::client_state& c
future<> audit::log_login(const sstring& username, socket_address client_ip, bool error) noexcept { future<> audit::log_login(const sstring& username, socket_address client_ip, bool error) noexcept {
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr(); socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
if (!_storage_running) {
on_internal_error_noexcept(logger, fmt::format("Audit login log dropped (storage not ready): node_ip {} client_ip {} username {} error {}",
node_ip, client_ip, username, error ? "true" : "false"));
return make_ready_future<>();
}
if (logger.is_enabled(logging::log_level::debug)) { if (logger.is_enabled(logging::log_level::debug)) {
logger.debug("Login log written: node_ip {}, client_ip {}, username {}, error {}", logger.debug("Login log written: node_ip {}, client_ip {}, username {}, error {}",
node_ip, client_ip, username, error ? "true" : "false"); node_ip, client_ip, username, error ? "true" : "false");

View File

@@ -141,7 +141,6 @@ private:
category_set _audited_categories; category_set _audited_categories;
std::unique_ptr<storage_helper> _storage_helper_ptr; std::unique_ptr<storage_helper> _storage_helper_ptr;
bool _storage_running = false;
const db::config& _cfg; const db::config& _cfg;
utils::observer<sstring> _cfg_keyspaces_observer; utils::observer<sstring> _cfg_keyspaces_observer;
@@ -164,8 +163,6 @@ public:
return audit_instance().local(); return audit_instance().local();
} }
static future<> start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm); static future<> start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm);
static future<> start_storage(const db::config& cfg);
static future<> stop_storage();
static future<> stop_audit(); static future<> stop_audit();
static audit_info_ptr create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table, bool batch = false); static audit_info_ptr create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table, bool batch = false);
audit(locator::shared_token_metadata& stm, audit(locator::shared_token_metadata& stm,
@@ -177,6 +174,8 @@ public:
category_set&& audited_categories, category_set&& audited_categories,
const db::config& cfg); const db::config& cfg);
~audit(); ~audit();
future<> start(const db::config& cfg);
future<> stop();
future<> shutdown(); future<> shutdown();
bool should_log(const audit_info& audit_info) const; bool should_log(const audit_info& audit_info) const;
bool will_log(statement_category cat, std::string_view keyspace = {}, std::string_view table = {}) const; bool will_log(statement_category cat, std::string_view keyspace = {}, std::string_view table = {}) const;

View File

@@ -258,11 +258,13 @@ future<> ldap_role_manager::start() {
} catch (const seastar::sleep_aborted&) { } catch (const seastar::sleep_aborted&) {
co_return; // ignore co_return; // ignore
} }
try { co_await _cache.container().invoke_on_all([] (cache& c) -> future<> {
co_await _cache.reload_all_permissions(); try {
} catch (...) { co_await c.reload_all_permissions();
mylog.warn("Cache reload all permissions failed: {}", std::current_exception()); } catch (...) {
} mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
}
});
} }
}); });
return _std_mgr.start(); return _std_mgr.start();

View File

@@ -157,20 +157,6 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
return create_legacy_keyspace_if_missing(mm); return create_legacy_keyspace_if_missing(mm);
}); });
} }
// Authorizer must be started before the permission loader is set,
// because the loader calls _authorizer->authorize().
// The loader must be set before starting the role manager, because
// LDAP role manager starts a pruner fiber that calls
// reload_all_permissions() which asserts _permission_loader is set.
co_await _authorizer->start();
if (!_used_by_maintenance_socket) {
// Maintenance socket mode can't cache permissions because it has
// different authorizer. We can't mix cached permissions, they could be
// different in normal mode.
_cache.set_permission_loader(std::bind(
&service::get_uncached_permissions,
this, std::placeholders::_1, std::placeholders::_2));
}
co_await _role_manager->start(); co_await _role_manager->start();
if (this_shard_id() == 0) { if (this_shard_id() == 0) {
// Role manager and password authenticator have this odd startup // Role manager and password authenticator have this odd startup
@@ -179,19 +165,21 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
// creation therefore we need to wait here. // creation therefore we need to wait here.
co_await _role_manager->ensure_superuser_is_created(); co_await _role_manager->ensure_superuser_is_created();
} }
// Authenticator must be started after ensure_superuser_is_created() co_await when_all_succeed(_authorizer->start(), _authenticator->start()).discard_result();
// because password_authenticator queries system.roles for the if (!_used_by_maintenance_socket) {
// superuser entry created by the role manager. // Maintenance socket mode can't cache permissions because it has
co_await _authenticator->start(); // different authorizer. We can't mix cached permissions, they could be
// different in normal mode.
_cache.set_permission_loader(std::bind(
&service::get_uncached_permissions,
this, std::placeholders::_1, std::placeholders::_2));
}
} }
future<> service::stop() { future<> service::stop() {
_as.request_abort(); _as.request_abort();
// Reverse of start() order.
co_await _authenticator->stop();
co_await _role_manager->stop();
_cache.set_permission_loader(nullptr); _cache.set_permission_loader(nullptr);
co_await _authorizer->stop(); return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop()).discard_result();
} }
future<> service::ensure_superuser_is_created() { future<> service::ensure_superuser_is_created() {

View File

@@ -593,7 +593,6 @@ scylla_tests = set([
'test/boost/linearizing_input_stream_test', 'test/boost/linearizing_input_stream_test',
'test/boost/lister_test', 'test/boost/lister_test',
'test/boost/locator_topology_test', 'test/boost/locator_topology_test',
'test/boost/lock_tables_metadata_test',
'test/boost/log_heap_test', 'test/boost/log_heap_test',
'test/boost/logalloc_standard_allocator_segment_pool_backend_test', 'test/boost/logalloc_standard_allocator_segment_pool_backend_test',
'test/boost/logalloc_test', 'test/boost/logalloc_test',
@@ -1711,7 +1710,7 @@ deps['test/boost/combined_tests'] += [
'test/boost/sstable_compression_config_test.cc', 'test/boost/sstable_compression_config_test.cc',
'test/boost/sstable_directory_test.cc', 'test/boost/sstable_directory_test.cc',
'test/boost/sstable_set_test.cc', 'test/boost/sstable_set_test.cc',
'test/boost/sstable_tablet_streaming_test.cc', 'test/boost/sstable_tablet_streaming.cc',
'test/boost/statement_restrictions_test.cc', 'test/boost/statement_restrictions_test.cc',
'test/boost/storage_proxy_test.cc', 'test/boost/storage_proxy_test.cc',
'test/boost/tablets_test.cc', 'test/boost/tablets_test.cc',

View File

@@ -399,10 +399,9 @@ future<> gossiper::do_send_ack2_msg(locator::host_id from, utils::chunked_vector
} }
} }
gms::gossip_digest_ack2 ack2_msg(std::move(delta_ep_state_map)); gms::gossip_digest_ack2 ack2_msg(std::move(delta_ep_state_map));
auto ack2_msg_str = fmt::format("{}", ack2_msg); logger.debug("Calling do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg);
logger.debug("Calling do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg_str);
co_await ser::gossip_rpc_verbs::send_gossip_digest_ack2(&_messaging, from, std::move(ack2_msg)); co_await ser::gossip_rpc_verbs::send_gossip_digest_ack2(&_messaging, from, std::move(ack2_msg));
logger.debug("finished do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg_str); logger.debug("finished do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg);
} }
// Depends on // Depends on

46
main.cc
View File

@@ -1810,18 +1810,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
utils::get_local_injector().inject("stop_after_starting_migration_manager", utils::get_local_injector().inject("stop_after_starting_migration_manager",
[] { std::raise(SIGSTOP); }); [] { std::raise(SIGSTOP); });
// Audit must be constructed before the maintenance socket so
// that on shutdown (reverse destruction order) the audit service
// outlives the maintenance socket and in-flight queries can
// still reach audit::inspect() safely.
checkpoint(stop_signal, "starting audit service");
audit::audit::start_audit(*cfg, token_metadata, qp, mm).handle_exception([&] (auto&& e) {
startlog.error("audit start failed: {}", e);
}).get();
auto audit_stop = defer([] {
audit::audit::stop_audit().get();
});
// XXX: stop_raft has to happen before query_processor and migration_manager // XXX: stop_raft has to happen before query_processor and migration_manager
// is stopped, since some groups keep using the query // is stopped, since some groups keep using the query
// processor until are stopped inside stop_raft. // processor until are stopped inside stop_raft.
@@ -2352,22 +2340,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
}).get(); }).get();
stop_signal.ready(false); stop_signal.ready(false);
// At this point, `locator::topology` should be stable, i.e. we should have complete information
// about the layout of the cluster (= list of nodes along with the racks/DCs).
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
db.local().check_rf_rack_validity(token_metadata.local().get());
startlog.info("Verifying that all of the tablet keyspaces use rack list replication factors");
db.local().check_rack_list_everywhere(cfg->enforce_rack_list());
// The table-based audit backend needs Raft (via join_cluster)
// to create its keyspace and table.
checkpoint(stop_signal, "starting audit storage");
audit::audit::start_storage(*cfg).get();
auto audit_storage_stop = defer([] {
audit::audit::stop_storage().get();
});
if (cfg->maintenance_socket() != "ignore") { if (cfg->maintenance_socket() != "ignore") {
// Enable role operations now that node joined the cluster // Enable role operations now that node joined the cluster
maintenance_auth_service.invoke_on_all([](auth::service& svc) { maintenance_auth_service.invoke_on_all([](auth::service& svc) {
@@ -2377,6 +2349,24 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
start_cql(*cql_maintenance_server_ctl, stop_maintenance_cql, "maintenance native server"); start_cql(*cql_maintenance_server_ctl, stop_maintenance_cql, "maintenance native server");
} }
// At this point, `locator::topology` should be stable, i.e. we should have complete information
// about the layout of the cluster (= list of nodes along with the racks/DCs).
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
db.local().check_rf_rack_validity(token_metadata.local().get());
startlog.info("Verifying that all of the tablet keyspaces use rack list replication factors");
db.local().check_rack_list_everywhere(cfg->enforce_rack_list());
// Start audit service after join_cluster so that the table-based audit backend
// can properly create its keyspace and table.
checkpoint(stop_signal, "starting audit service");
audit::audit::start_audit(*cfg, token_metadata, qp, mm).handle_exception([&] (auto&& e) {
startlog.error("audit start failed: {}", e);
}).get();
auto audit_stop = defer([] {
audit::audit::stop_audit().get();
});
// Semantic validation of sstable compression parameters from config. // Semantic validation of sstable compression parameters from config.
// Adding here (i.e., after `join_cluster`) to ensure that the // Adding here (i.e., after `join_cluster`) to ensure that the
// required SSTABLE_COMPRESSION_DICTS cluster feature has been negotiated. // required SSTABLE_COMPRESSION_DICTS cluster feature has been negotiated.

View File

@@ -1328,27 +1328,9 @@ future<global_table_ptr> get_table_on_all_shards(sharded<database>& sharded_db,
future<tables_metadata_lock_on_all_shards> database::lock_tables_metadata(sharded<database>& sharded_db) { future<tables_metadata_lock_on_all_shards> database::lock_tables_metadata(sharded<database>& sharded_db) {
tables_metadata_lock_on_all_shards locks; tables_metadata_lock_on_all_shards locks;
// Acquire write lock on shard 0 first, and then on the remaining shards. co_await sharded_db.invoke_on_all([&] (auto& db) -> future<> {
//
// Parallel acquisition on all shards could deadlock when two
// fibers call lock_tables_metadata() concurrently: parallel_for_each
// sends SMP messages to all shards even when the local shard's lock
// attempt blocks. If task reordering (SEASTAR_SHUFFLE_TASK_QUEUE in
// debug/sanitize builds) causes fiber A to win on shard X while
// fiber B wins on shard Y, neither can make progress — classic
// cross-shard lock-ordering deadlock.
//
// Acquiring the write lock on shard 0 first, and then on the remaining
// shards, eliminates this: whichever fiber acquires shard 0 first is
// guaranteed to acquire locks on all other shards before the other fiber
// can acquire the lock on shard 0.
co_await sharded_db.invoke_on(0, [&locks, &sharded_db] (auto& db) -> future<> {
locks.assign_lock(co_await db.get_tables_metadata().hold_write_lock()); locks.assign_lock(co_await db.get_tables_metadata().hold_write_lock());
co_await sharded_db.invoke_on_others([&locks] (auto& db) -> future<> {
locks.assign_lock(co_await db.get_tables_metadata().hold_write_lock());
});
}); });
co_return locks; co_return locks;
} }

View File

@@ -5466,9 +5466,10 @@ class scylla_compaction_tasks(gdb.Command):
try: try:
task_list = list(intrusive_list(cm['_tasks'])) task_list = list(intrusive_list(cm['_tasks']))
except gdb.error: # 6.2 compatibility except gdb.error: # 6.2 compatibility
task_list = [seastar_shared_ptr(t).get().dereference() for t in std_list(cm['_tasks'])] task_list = list(std_list(cm['_tasks']))
for task in task_list: for task in task_list:
task = seastar_shared_ptr(task).get().dereference()
schema = schema_ptr(task['_compacting_table'].dereference()['_schema']) schema = schema_ptr(task['_compacting_table'].dereference()['_schema'])
key = 'type={}, state={:5}, {}'.format(task['_type'], str(task['_state']), schema.table_name()) key = 'type={}, state={:5}, {}'.format(task['_type'], str(task['_state']), schema.table_name())
task_hist.add(key) task_hist.add(key)

View File

@@ -438,10 +438,9 @@ static future<cql3::untyped_result_set> do_execute_cql_with_timeout(sstring req,
const auto cache_key = qp.compute_id(req, "", cql3::internal_dialect()); const auto cache_key = qp.compute_id(req, "", cql3::internal_dialect());
auto ps_ptr = qp.get_prepared(cache_key); auto ps_ptr = qp.get_prepared(cache_key);
shared_ptr<cql_transport::messages::result_message::prepared> prepared_msg;
if (!ps_ptr) { if (!ps_ptr) {
prepared_msg = co_await qp.prepare(req, qs, cql3::internal_dialect()); const auto msg_ptr = co_await qp.prepare(req, qs, cql3::internal_dialect());
ps_ptr = prepared_msg->get_prepared(); ps_ptr = msg_ptr->get_prepared();
if (!ps_ptr) { if (!ps_ptr) {
on_internal_error(paxos_state::logger, "prepared statement is null"); on_internal_error(paxos_state::logger, "prepared statement is null");
} }
@@ -450,8 +449,8 @@ static future<cql3::untyped_result_set> do_execute_cql_with_timeout(sstring req,
-1, service::node_local_only::yes); -1, service::node_local_only::yes);
const auto st = ps_ptr->statement; const auto st = ps_ptr->statement;
const auto result_ptr = co_await st->execute(qp, qs, qo, std::nullopt); const auto msg_ptr = co_await st->execute(qp, qs, qo, std::nullopt);
co_return cql3::untyped_result_set(result_ptr); co_return cql3::untyped_result_set(msg_ptr);
} }
template <typename... Args> template <typename... Args>

View File

@@ -4237,7 +4237,6 @@ public:
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker) , _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
, _async_gate("topology_coordinator") , _async_gate("topology_coordinator")
{ {
_lifecycle_notifier.register_subscriber(this);
_db.get_notifier().register_listener(this); _db.get_notifier().register_listener(this);
// When the delay_cdc_stream_finalization error injection is disabled // When the delay_cdc_stream_finalization error injection is disabled
// (test releases it), wake the topology coordinator so it retries // (test releases it), wake the topology coordinator so it retries
@@ -4401,7 +4400,6 @@ future<bool> topology_coordinator::maybe_retry_failed_rf_change_tablet_rebuilds(
} }
future<> topology_coordinator::refresh_tablet_load_stats() { future<> topology_coordinator::refresh_tablet_load_stats() {
co_await utils::get_local_injector().inject("refresh_tablet_load_stats_pause", utils::wait_for_message(5min));
auto tm = get_token_metadata_ptr(); auto tm = get_token_metadata_ptr();
locator::load_stats stats; locator::load_stats stats;
@@ -4725,6 +4723,7 @@ future<> topology_coordinator::run() {
co_await _async_gate.close(); co_await _async_gate.close();
co_await std::move(tablet_load_stats_refresher); co_await std::move(tablet_load_stats_refresher);
co_await _tablet_load_stats_refresh.join();
co_await std::move(cdc_generation_publisher); co_await std::move(cdc_generation_publisher);
co_await std::move(cdc_streams_gc); co_await std::move(cdc_streams_gc);
co_await std::move(gossiper_orphan_remover); co_await std::move(gossiper_orphan_remover);
@@ -4737,8 +4736,6 @@ future<> topology_coordinator::stop() {
co_await _db.get_notifier().unregister_listener(this); co_await _db.get_notifier().unregister_listener(this);
utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization"); utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization");
_topo_sm.on_tablet_split_ready = nullptr; _topo_sm.on_tablet_split_ready = nullptr;
co_await _lifecycle_notifier.unregister_subscriber(this);
co_await _tablet_load_stats_refresh.join();
// if topology_coordinator::run() is aborted either because we are not a // if topology_coordinator::run() is aborted either because we are not a
// leader anymore, or we are shutting down as a leader, we have to handle // leader anymore, or we are shutting down as a leader, we have to handle
@@ -4800,6 +4797,7 @@ future<> run_topology_coordinator(
topology_cmd_rpc_tracker}; topology_cmd_rpc_tracker};
std::exception_ptr ex; std::exception_ptr ex;
lifecycle_notifier.register_subscriber(&coordinator);
try { try {
rtlogger.info("start topology coordinator fiber"); rtlogger.info("start topology coordinator fiber");
co_await with_scheduling_group(group0.get_scheduling_group(), [&] { co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
@@ -4820,7 +4818,7 @@ future<> run_topology_coordinator(
} }
on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex)); on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex));
} }
co_await utils::get_local_injector().inject("topology_coordinator_pause_before_stop", utils::wait_for_message(5min)); co_await lifecycle_notifier.unregister_subscriber(&coordinator);
co_await coordinator.stop(); co_await coordinator.stop();
} }

View File

@@ -543,16 +543,11 @@ future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept {
// during SSTable writing and removed before sealing. If the write // during SSTable writing and removed before sealing. If the write
// failed before sealing, the file may still be on disk and must be // failed before sealing, the file may still be on disk and must be
// cleaned up explicitly. // cleaned up explicitly.
// The component is only defined for the `ms` sstable format; for
// older formats it is absent from the component map and looking up
// its filename would throw std::out_of_range.
// Use file_exists() to avoid a C++ exception on the common path // Use file_exists() to avoid a C++ exception on the common path
// where the file was already removed before sealing. // where the file was already removed before sealing.
if (sstable_version_constants::get_component_map(sst.get_version()).contains(component_type::TemporaryHashes)) { auto temp_hashes = filename(sst, dir_name.native(), sst._generation, component_type::TemporaryHashes);
auto temp_hashes = filename(sst, dir_name.native(), sst._generation, component_type::TemporaryHashes); if (co_await file_exists(temp_hashes)) {
if (co_await file_exists(temp_hashes)) { co_await sst.sstable_write_io_check(remove_file, std::move(temp_hashes));
co_await sst.sstable_write_io_check(remove_file, std::move(temp_hashes));
}
} }
if (sync) { if (sync) {
co_await sst.sstable_write_io_check(sync_directory, dir_name.native()); co_await sst.sstable_write_io_check(sync_directory, dir_name.native());

View File

@@ -135,23 +135,7 @@ future<> table_helper::cache_table_info(cql3::query_processor& qp, service::migr
} }
future<> table_helper::insert(cql3::query_processor& qp, service::migration_manager& mm, service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker) { future<> table_helper::insert(cql3::query_processor& qp, service::migration_manager& mm, service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker) {
// _prepared_stmt is a checked_weak_ptr into the prepared statements co_await cache_table_info(qp, mm, qs);
// cache and can be invalidated by a concurrent purge (e.g. on a schema
// change). cache_table_info() (re-)prepares and assigns _prepared_stmt,
// but the pin protecting the entry is dropped when try_prepare()
// returns. In release the chain of ready-future co_awaits back to here
// resumes synchronously, but debug builds preempt on every co_await
// even for ready futures, opening a window for a purge to drop the
// entry and leave _prepared_stmt null. Loop until a synchronous
// post-resume check finds _prepared_stmt valid; nothing can run between
// that check and the dereference below. _insert_stmt is a strong
// shared_ptr and is not affected by cache invalidation.
while (true) {
co_await cache_table_info(qp, mm, qs);
if (_prepared_stmt) {
break;
}
}
auto opts = opt_maker(); auto opts = opt_maker();
opts.prepare(_prepared_stmt->bound_names); opts.prepare(_prepared_stmt->bound_names);
co_await _insert_stmt->execute(qp, qs, opts, std::nullopt); co_await _insert_stmt->execute(qp, qs, opts, std::nullopt);

76
test.py
View File

@@ -11,11 +11,9 @@ from __future__ import annotations
import argparse import argparse
import asyncio import asyncio
import dataclasses
import math import math
import shlex import shlex
import textwrap import textwrap
from bisect import insort
from random import randint from random import randint
import pytest import pytest
@@ -185,8 +183,6 @@ def parse_cmd_line() -> argparse.Namespace:
help="Specific byte limit for failure injection (random by default)") help="Specific byte limit for failure injection (random by default)")
parser.add_argument('--skip-internet-dependent-tests', action="store_true", parser.add_argument('--skip-internet-dependent-tests', action="store_true",
help="Skip tests which depend on artifacts from the internet.") help="Skip tests which depend on artifacts from the internet.")
parser.add_argument('--keep-duplicates', action='store_true', default=False,
help="Do not deduplicate test arguments.")
parser.add_argument("--pytest-arg", action='store', type=str, parser.add_argument("--pytest-arg", action='store', type=str,
default=None, dest="pytest_arg", default=None, dest="pytest_arg",
help="Additional command line arguments to pass to pytest, for example ./test.py --pytest-arg=\"-v -x\"") help="Additional command line arguments to pass to pytest, for example ./test.py --pytest-arg=\"-v -x\"")
@@ -245,73 +241,6 @@ def parse_cmd_line() -> argparse.Namespace:
return args return args
# TODO: Remove _CollectionArgument and _deduplicate_test_args once we update
# to pytest 9.x, which fixes argument deduplication:
# https://github.com/pytest-dev/pytest/issues/12083
@dataclasses.dataclass(frozen=True, order=True)
class _CollectionArgument:
"""Resolved collection argument for deduplication.
A version-independent subset of pytest's CollectionArgument that
includes the fields needed for normalization (parametrization and
original_index were added in pytest 9.0).
``a in b`` means ``b`` subsumes (contains) ``a``. Adapted from
pytest 9.0.3 ``_pytest.main.is_collection_argument_subsumed_by``.
"""
path: pathlib.Path
parts: tuple[str, ...]
parametrization: str
original_index: int
def __contains__(self, other: _CollectionArgument) -> bool:
if self.path != other.path:
return not self.parts and other.path.is_relative_to(self.path)
if len(self.parts) > len(other.parts) or other.parts[:len(self.parts)] != self.parts:
return False
return not self.parametrization or self.parametrization == other.parametrization
def _deduplicate_test_args(args: list[str]) -> list[str]:
"""Remove duplicate and subsumed test arguments.
Resolves and normalizes CLI test arguments, then applies the normalization
algorithm from pytest 9.0.3 to remove exact duplicates and arguments whose
paths are contained within another argument's path.
For example, ``["test/cql", "test/cql/lua_test.cql"]`` becomes ``["test/cql"]``.
"""
if not args:
return args
invocation_path = pathlib.Path.cwd()
resolved_sorted: list[_CollectionArgument] = []
unresolved_indices: set[int] = set()
for i, arg in enumerate(args):
# Adapted from pytest 9.0.3 _pytest.main.resolve_collection_argument.
base, squacket, rest = arg.partition("[")
strpath, *parts = base.split("::")
fspath = pathlib.Path(os.path.abspath(invocation_path / strpath))
if not fspath.exists():
# Keep unresolved args — let pytest report the error.
unresolved_indices.add(i)
continue
insort(resolved_sorted, _CollectionArgument(
path=fspath,
parts=tuple(parts),
parametrization=squacket + rest,
original_index=i,
))
# Normalize: remove duplicates and subsumed arguments using an O(n log n)
# sort-based algorithm adapted from pytest 9.0.3.
normalized = resolved_sorted[:1]
for ca in resolved_sorted[1:]:
if ca not in normalized[-1]:
normalized.append(ca)
kept_indices = {ca.original_index for ca in normalized} | unresolved_indices
return [arg for i, arg in enumerate(args) if i in kept_indices]
def run_pytest(options: argparse.Namespace) -> int: def run_pytest(options: argparse.Namespace) -> int:
# When tests are executed in parallel on different hosts, we need to distinguish results from them. # When tests are executed in parallel on different hosts, we need to distinguish results from them.
# So HOST_ID needed to not overwrite results from different hosts during Jenkins will copy to one directory. # So HOST_ID needed to not overwrite results from different hosts during Jenkins will copy to one directory.
@@ -320,8 +249,7 @@ def run_pytest(options: argparse.Namespace) -> int:
report_dir = temp_dir / 'report' report_dir = temp_dir / 'report'
junit_output_file = report_dir / f'pytest_cpp_{HOST_ID}.xml' junit_output_file = report_dir / f'pytest_cpp_{HOST_ID}.xml'
files_to_run = options.name if options.keep_duplicates else _deduplicate_test_args(options.name) files_to_run = options.name or [str(TOP_SRC_DIR / 'test/')]
files_to_run = files_to_run or [str(TOP_SRC_DIR / 'test/')]
args = [ args = [
'--color=yes', '--color=yes',
f'--repeat={options.repeat}', f'--repeat={options.repeat}',
@@ -341,8 +269,6 @@ def run_pytest(options: argparse.Namespace) -> int:
]) ])
if options.verbose: if options.verbose:
args.append('-v') args.append('-v')
if options.keep_duplicates:
args.append('--keep-duplicates')
if options.quiet: if options.quiet:
args.append('--quiet') args.append('--quiet')
args.extend(['-p','no:sugar']) args.extend(['-p','no:sugar'])

View File

@@ -150,8 +150,6 @@ add_scylla_test(lister_test
KIND SEASTAR) KIND SEASTAR)
add_scylla_test(locator_topology_test add_scylla_test(locator_topology_test
KIND SEASTAR) KIND SEASTAR)
add_scylla_test(lock_tables_metadata_test
KIND SEASTAR)
add_scylla_test(log_heap_test add_scylla_test(log_heap_test
KIND BOOST) KIND BOOST)
add_scylla_test(logalloc_standard_allocator_segment_pool_backend_test add_scylla_test(logalloc_standard_allocator_segment_pool_backend_test
@@ -376,7 +374,7 @@ add_scylla_test(combined_tests
sstable_compression_config_test.cc sstable_compression_config_test.cc
sstable_directory_test.cc sstable_directory_test.cc
sstable_set_test.cc sstable_set_test.cc
sstable_tablet_streaming_test.cc sstable_tablet_streaming.cc
statement_restrictions_test.cc statement_restrictions_test.cc
storage_proxy_test.cc storage_proxy_test.cc
tablets_test.cc tablets_test.cc

View File

@@ -1,36 +0,0 @@
/*
* Copyright (C) 2026-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#include <fmt/format.h>
#include <seastar/core/with_timeout.hh>
#include <seastar/testing/test_case.hh>
#include "test/lib/cql_test_env.hh"
using namespace std::chrono_literals;
// Test that two lock_tables_metadata calls don't deadlock
SEASTAR_TEST_CASE(test_lock_tables_metadata_deadlock) {
return do_with_cql_env_thread([](cql_test_env& e) {
try {
// Repeat the test scenario to increase the chance of hitting the deadlock.
// If no deadlock occurs, each repetition should complete within a fraction of a second,
// so even with 100 repetitions, the total test time should be reasonable.
for (int i = 0; i < 100; ++i) {
with_timeout(lowres_clock::now() + 30s,
when_all_succeed(
e.local_db().lock_tables_metadata(e.db()).discard_result(),
e.local_db().lock_tables_metadata(e.db()).discard_result()
)).get();
}
} catch (seastar::timed_out_error&) {
fmt::print(stderr, "FAIL: lock_tables_metadata deadlocked (timed out after 30s)\n");
_exit(1);
}
});
}

View File

@@ -246,33 +246,6 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_extra_temporary_toc) {
}); });
} }
// Reproducer for SCYLLADB-1697
SEASTAR_TEST_CASE(sstable_directory_test_unlink_sstable_leaves_no_orphans) {
return sstables::test_env::do_with_async([] (test_env& env) {
for (const auto version : {sstable_version_types::me, sstable_version_types::ms}) {
testlog.info("Testing sstable version: {}", version);
auto sst = make_sstable_for_this_shard([&env, version] {
return env.make_sstable(test_table_schema(), version);
});
// Sanity: the TOC was written, otherwise the assertion below would be vacuous.
BOOST_REQUIRE(file_exists(test(sst).filename(sstables::component_type::TOC).native()).get());
sst->unlink().get();
std::vector<sstring> remaining;
lister::scan_dir(env.tempdir().path(), lister::dir_entry_types::of<directory_entry_type::regular>(),
[&remaining] (fs::path, directory_entry de) {
remaining.push_back(de.name);
return make_ready_future<>();
}).get();
BOOST_REQUIRE_MESSAGE(remaining.empty(),
fmt::format("Expected empty sstable dir after unlink for version {}, found: {}", version, remaining));
}
});
}
// Test the absence of TOC. Behavior is controllable by a flag // Test the absence of TOC. Behavior is controllable by a flag
SEASTAR_TEST_CASE(sstable_directory_test_table_missing_toc) { SEASTAR_TEST_CASE(sstable_directory_test_table_missing_toc) {
return sstables::test_env::do_with_async([] (test_env& env) { return sstables::test_env::do_with_async([] (test_env& env) {

View File

@@ -11,11 +11,13 @@ from typing import TYPE_CHECKING
from cassandra.auth import PlainTextAuthProvider from cassandra.auth import PlainTextAuthProvider
from test.pylib.internal_types import ServerInfo
from test.pylib.manager_client import ManagerClient from test.pylib.manager_client import ManagerClient
from test.cluster.dtest.ccmlib.common import logger from test.cluster.dtest.ccmlib.common import logger
from test.cluster.dtest.ccmlib.scylla_node import ScyllaNode from test.cluster.dtest.ccmlib.scylla_node import ScyllaNode
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import Iterable
from typing import Any from typing import Any
@@ -27,10 +29,6 @@ class ScyllaCluster:
self.manager = manager self.manager = manager
self.scylla_mode = scylla_mode self.scylla_mode = scylla_mode
self._config_options = {} self._config_options = {}
# Cached ScyllaNode instances. Nodes are appended by _add_nodes()
# in the order they are created by servers_add().
self._nodes: list[ScyllaNode] = []
self._next_node_num: int = 1
if self.scylla_mode == "debug": if self.scylla_mode == "debug":
self.default_wait_other_notice_timeout = 600 self.default_wait_other_notice_timeout = 600
@@ -41,20 +39,19 @@ class ScyllaCluster:
self.force_wait_for_cluster_start = force_wait_for_cluster_start self.force_wait_for_cluster_start = force_wait_for_cluster_start
def _add_nodes(self, servers: list) -> None: @staticmethod
"""Create ScyllaNode instances for the given servers and cache them.""" def _sorted_nodes(servers: Iterable[ServerInfo]) -> list[ServerInfo]:
for server in servers: return sorted(servers, key=lambda s: s.server_id)
name = f"node{self._next_node_num}"
self._next_node_num += 1
self._nodes.append(ScyllaNode(
cluster=self, server=server, name=name))
@property @property
def nodes(self) -> dict[str, ScyllaNode]: def nodes(self) -> dict[str, ScyllaNode]:
return {node.name: node for node in self.nodelist()} return {node.name: node for node in self.nodelist()}
def nodelist(self) -> list[ScyllaNode]: def nodelist(self) -> list[ScyllaNode]:
return list(self._nodes) return [
ScyllaNode(cluster=self, server=server, name=f"node{n}")
for n, server in enumerate(self._sorted_nodes(self.manager.all_servers()), start=1)
]
def get_node_ip(self, nodeid: int) -> str: def get_node_ip(self, nodeid: int) -> str:
return self.nodelist()[nodeid-1].address() return self.nodelist()[nodeid-1].address()
@@ -64,16 +61,16 @@ class ScyllaCluster:
self.manager.auth_provider = PlainTextAuthProvider(username="cassandra", password="cassandra") self.manager.auth_provider = PlainTextAuthProvider(username="cassandra", password="cassandra")
match nodes: match nodes:
case int(): case int():
self._add_nodes(self.manager.servers_add(servers_num=nodes, config=self._config_options, start=False, auto_rack_dc="dc1")) self.manager.servers_add(servers_num=nodes, config=self._config_options, start=False, auto_rack_dc="dc1")
case list(): case list():
for dc, n_nodes in enumerate(nodes, start=1): for dc, n_nodes in enumerate(nodes, start=1):
dc_name = f"dc{dc}" dc_name = f"dc{dc}"
self._add_nodes(self.manager.servers_add( self.manager.servers_add(
servers_num=n_nodes, servers_num=n_nodes,
config=self._config_options, config=self._config_options,
start=False, start=False,
auto_rack_dc=dc_name auto_rack_dc=dc_name
)) )
case dict(): case dict():
# Supported spec: {"dc1": {"rack1": 3, "rack2": 2}, "dc2": {"rack1": 2}} # Supported spec: {"dc1": {"rack1": 3, "rack2": 2}, "dc2": {"rack1": 2}}
for dc, dc_nodes in nodes.items(): for dc, dc_nodes in nodes.items():
@@ -82,7 +79,7 @@ class ScyllaCluster:
for rack, rack_nodes in dc_nodes.items(): for rack, rack_nodes in dc_nodes.items():
if not isinstance(rack_nodes, int): if not isinstance(rack_nodes, int):
raise RuntimeError(f"Unsupported topology specification: {nodes}") raise RuntimeError(f"Unsupported topology specification: {nodes}")
self._add_nodes(self.manager.servers_add( self.manager.servers_add(
servers_num=rack_nodes, servers_num=rack_nodes,
config=self._config_options, config=self._config_options,
property_file={ property_file={
@@ -90,7 +87,7 @@ class ScyllaCluster:
"rack": rack, "rack": rack,
}, },
start=False, start=False,
)) )
case _: case _:
raise RuntimeError(f"Unsupported topology specification: {nodes}") raise RuntimeError(f"Unsupported topology specification: {nodes}")
@@ -230,6 +227,11 @@ class ScyllaCluster:
def flush(self) -> None: def flush(self) -> None:
self.nodetool("flush") self.nodetool("flush")
def compact(self, keyspace: str = "", tables: list[str] | None = None) -> None:
for node in self.nodelist():
if node.is_running():
node.compact(keyspace=keyspace, tables=tables)
@staticmethod @staticmethod
def debug(message: str) -> None: def debug(message: str) -> None:
logger.debug(message) logger.debug(message)

View File

@@ -17,7 +17,6 @@ from itertools import chain
from functools import cached_property from functools import cached_property
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
import logging
from test.cluster.dtest.ccmlib.common import ArgumentError, wait_for, BIN_DIR from test.cluster.dtest.ccmlib.common import ArgumentError, wait_for, BIN_DIR
from test.pylib.internal_types import ServerUpState from test.pylib.internal_types import ServerUpState
@@ -29,9 +28,6 @@ if TYPE_CHECKING:
from test.cluster.dtest.ccmlib.scylla_cluster import ScyllaCluster from test.cluster.dtest.ccmlib.scylla_cluster import ScyllaCluster
logger = logging.getLogger("scylla_node")
NODETOOL_STDERR_IGNORED_PATTERNS = ( NODETOOL_STDERR_IGNORED_PATTERNS = (
re.compile(r"WARNING: debug mode. Not for benchmarking or production"), re.compile(r"WARNING: debug mode. Not for benchmarking or production"),
re.compile( re.compile(
@@ -115,6 +111,7 @@ class ScyllaNode:
self.data_center = server.datacenter self.data_center = server.datacenter
self.rack = server.rack self.rack = server.rack
self._hostid = None
self._smp_set_during_test = None self._smp_set_during_test = None
self._smp = None self._smp = None
self._memory = None self._memory = None
@@ -153,20 +150,15 @@ class ScyllaNode:
return self.cluster.scylla_mode return self.cluster.scylla_mode
def set_smp(self, smp: int) -> None: def set_smp(self, smp: int) -> None:
logger.debug(f"Setting smp: {self=} {smp=}")
self._smp_set_during_test = smp self._smp_set_during_test = smp
def smp(self) -> int: def smp(self) -> int:
logger.debug(f"Getting smp: {self=} _smp_set_during_test={self._smp_set_during_test} _smp={self._smp} {DEFAULT_SMP=}")
return self._smp_set_during_test or self._smp or DEFAULT_SMP return self._smp_set_during_test or self._smp or DEFAULT_SMP
def memory(self) -> int: def memory(self) -> int:
return self._memory or self.smp() * DEFAULT_MEMORY_PER_CPU return self._memory or self.smp() * DEFAULT_MEMORY_PER_CPU
def _adjust_smp_and_memory(self, smp: int | None = None, memory: int | None = None) -> None: def _adjust_smp_and_memory(self, smp: int | None = None, memory: int | None = None) -> None:
if not memory and not smp:
return
logger.debug(f"Adjusting smp={smp} memory={memory} current_smp={self._smp} current_memory={self._memory}")
if memory: if memory:
self._memory = memory // (smp or self.smp()) * self.smp() self._memory = memory // (smp or self.smp()) * self.smp()
if smp: if smp:
@@ -455,8 +447,6 @@ class ScyllaNode:
self.mark = self.mark_log() self.mark = self.mark_log()
logger.debug(f"Starting server: server_id={self.server_id} {scylla_args=} {scylla_env=}")
self.cluster.manager.server_start( self.cluster.manager.server_start(
server_id=self.server_id, server_id=self.server_id,
seeds=None if self.bootstrap else [self.address()], seeds=None if self.bootstrap else [self.address()],
@@ -476,6 +466,9 @@ class ScyllaNode:
if wait_for_binary_proto: if wait_for_binary_proto:
self.wait_for_binary_interface(from_mark=self.mark) self.wait_for_binary_interface(from_mark=self.mark)
if not self._hostid:
self.hostid()
if wait_other_notice: if wait_other_notice:
timeout = self.cluster.default_wait_other_notice_timeout timeout = self.cluster.default_wait_other_notice_timeout
for node, mark in marks: for node, mark in marks:
@@ -658,11 +651,12 @@ class ScyllaNode:
cmd.append(table) cmd.append(table)
self.nodetool(" ".join(cmd), **kwargs) self.nodetool(" ".join(cmd), **kwargs)
def compact(self, keyspace: str = "", tables: str | None = ()) -> None: def compact(self, keyspace: str = "", tables: list[str] | None = None) -> None:
compact_cmd = ["compact"] compact_cmd = ["compact"]
if keyspace: if keyspace:
compact_cmd.append(keyspace) compact_cmd.append(keyspace)
compact_cmd += tables if tables:
compact_cmd.extend(tables)
self.nodetool(" ".join(compact_cmd)) self.nodetool(" ".join(compact_cmd))
def drain(self, block_on_log: bool = False) -> None: def drain(self, block_on_log: bool = False) -> None:
@@ -835,10 +829,13 @@ class ScyllaNode:
assert timeout is None, "argument `timeout` is not supported" # not used in scylla-dtest assert timeout is None, "argument `timeout` is not supported" # not used in scylla-dtest
assert force_refresh is None, "argument `force_refresh` is not supported" # not used in scylla-dtest assert force_refresh is None, "argument `force_refresh` is not supported" # not used in scylla-dtest
try: if not self._hostid:
return self.cluster.manager.get_host_id(server_id=self.server_id) try:
except Exception as exc: self._hostid = self.cluster.manager.get_host_id(server_id=self.server_id)
self.error(f"Failed to get hostid: {exc}") except Exception as exc:
self.error(f"Failed to get hostid: {exc}")
return self._hostid
def rmtree(self, path: str | Path) -> None: def rmtree(self, path: str | Path) -> None:
"""Delete a directory content without removing the directory. """Delete a directory content without removing the directory.

View File

@@ -34,6 +34,7 @@ def pytest_addoption(parser: Parser) -> None:
parser.addoption("--experimental-features", type=lambda s: s.split(","), action="store", help="Pass experimental features <feature>,<feature> to enable", default=None) parser.addoption("--experimental-features", type=lambda s: s.split(","), action="store", help="Pass experimental features <feature>,<feature> to enable", default=None)
parser.addoption("--tablets", action=argparse.BooleanOptionalAction, default=False, help="Whether to enable tablets support (default: %(default)s)") parser.addoption("--tablets", action=argparse.BooleanOptionalAction, default=False, help="Whether to enable tablets support (default: %(default)s)")
parser.addoption("--force-gossip-topology-changes", action="store_true", default=False, help="force gossip topology changes in a fresh cluster") parser.addoption("--force-gossip-topology-changes", action="store_true", default=False, help="force gossip topology changes in a fresh cluster")
parser.addoption("--compaction-strategy", action="store", default=None, help="Compaction strategy to use in tests that support it (e.g. wide_rows_test.py). One of LeveledCompactionStrategy, SizeTieredCompactionStrategy, TimeWindowCompactionStrategy, or IncrementalCompactionStrategy. If not set, a random strategy is chosen per test.")
def pytest_configure(config: Config) -> None: def pytest_configure(config: Config) -> None:

View File

@@ -1,46 +0,0 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
#
import logging
import pytest
from dtest_class import Tester
logger = logging.getLogger(__file__)
@pytest.mark.single_node
class TestSetSmp(Tester):
"""Test that node.set_smp() properly persists across restarts."""
def _get_smp_from_log(self, node, from_mark=None):
"""Extract smp value from the node's log by looking at the SHARD_COUNT gossip value."""
matches = node.grep_log(r"SHARD_COUNT : Value\((\d+),\d+\)", from_mark=from_mark)
assert matches, "Could not find SHARD_COUNT in node log"
# Return the last match (most recent start)
return int(matches[-1][1].group(1))
def test_set_smp(self):
"""Verify that set_smp() takes effect on the next start."""
cluster = self.cluster
cluster.populate(1).start(wait_for_binary_proto=True)
node1 = cluster.nodelist()[0]
default_smp = self._get_smp_from_log(node1)
cluster.stop()
# set_smp to a different value and restart without jvm_args
target_smp = 1 if default_smp != 1 else 2
node1.set_smp(target_smp)
mark = node1.mark_log()
cluster.start(wait_for_binary_proto=True)
node1 = cluster.nodelist()[0]
actual_smp = self._get_smp_from_log(node1, from_mark=mark)
assert actual_smp == target_smp, \
f"Expected smp={target_smp} after set_smp({target_smp}), got {actual_smp}"

View File

@@ -263,3 +263,25 @@ def assert_lists_equal_ignoring_order(list1, list2, sort_key=None):
sorted_list2 = sorted(normalized_list2, key=lambda elm: str(elm[sort_key])) sorted_list2 = sorted(normalized_list2, key=lambda elm: str(elm[sort_key]))
assert sorted_list1 == sorted_list2 assert sorted_list1 == sorted_list2
def assert_equal_more_with_deviation(actual, expect, deviation_perc):
"""
Assert actual is within inclusive interval [expected...expected+deviation_perc]
@param actual Value inspected
@param expect Beginning of expected interval
@param deviation_perc allowed percent increase
"""
deviation_high = (expect * (100 + deviation_perc)) / 100
assert expect <= actual <= deviation_high, f"Expect result interval {expect}..{deviation_high}, received {actual}"
def assert_less_equal_lists(actual_list, expected_list, msg=None):
"""
Assert actual_list is a subset of the expected list, prints hardcoded or parameterized error message
@param actual_list Inspected list
@param expected_list List that supposed to include actual_list
@param msg Configured message default None.
"""
standardMsg = msg or f"{actual_list} not less than or equal to {expected_list}"
assert set(actual_list) <= set(expected_list), standardMsg

File diff suppressed because it is too large Load Diff

View File

@@ -29,16 +29,12 @@ import pytest
from cassandra import AlreadyExists, AuthenticationFailed, ConsistencyLevel, InvalidRequest, Unauthorized, Unavailable, WriteFailure from cassandra import AlreadyExists, AuthenticationFailed, ConsistencyLevel, InvalidRequest, Unauthorized, Unavailable, WriteFailure
from cassandra.auth import PlainTextAuthProvider from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import NoHostAvailable, Session, EXEC_PROFILE_DEFAULT from cassandra.cluster import NoHostAvailable, Session, EXEC_PROFILE_DEFAULT
from cassandra.connection import UnixSocketEndPoint
from cassandra.policies import WhiteListRoundRobinPolicy
from cassandra.query import BatchStatement, BatchType, SimpleStatement, named_tuple_factory from cassandra.query import BatchStatement, BatchType, SimpleStatement, named_tuple_factory
from test.cluster.conftest import cluster_con
from test.cluster.dtest.dtest_class import create_ks, wait_for from test.cluster.dtest.dtest_class import create_ks, wait_for
from test.cluster.dtest.tools.assertions import assert_invalid from test.cluster.dtest.tools.assertions import assert_invalid
from test.cluster.dtest.tools.data import rows_to_list, run_in_parallel from test.cluster.dtest.tools.data import rows_to_list, run_in_parallel
from test.pylib.driver_utils import safe_driver_shutdown
from test.pylib.manager_client import ManagerClient from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import read_barrier from test.pylib.rest_client import read_barrier
from test.pylib.skip_types import skip_env from test.pylib.skip_types import skip_env
@@ -277,7 +273,6 @@ class AuditEntry:
statement: str statement: str
table: str table: str
user: str user: str
source: str = "127.0.0.1"
class AuditBackend: class AuditBackend:
@@ -454,13 +449,6 @@ class AuditBackendSyslog(AuditBackend):
entries.append(self.line_to_row(line, idx)) entries.append(self.line_to_row(line, idx))
return { self.audit_mode(): entries } return { self.audit_mode(): entries }
@staticmethod
def _parse_address(addr_port):
"""Extract IP from 'ip:port' (IPv4) or '[ip]:port' (IPv6)."""
if addr_port.startswith("["):
return addr_port[1:addr_port.index("]")]
return addr_port.split(":")[0]
def line_to_row(self, line, idx): def line_to_row(self, line, idx):
metadata, data = line.split(": ", 1) metadata, data = line.split(": ", 1)
data = "".join(data.splitlines()) # Remove newlines data = "".join(data.splitlines()) # Remove newlines
@@ -472,9 +460,9 @@ class AuditBackendSyslog(AuditBackend):
# and make sure it doesn't change during the test (e.g. when the test is running at 23:59:59) # and make sure it doesn't change during the test (e.g. when the test is running at 23:59:59)
date = datetime.datetime(2000, 1, 1, 0, 0) date = datetime.datetime(2000, 1, 1, 0, 0)
node = self._parse_address(match.group("node")) node = match.group("node").split(":")[0]
statement = match.group("query").replace("\\", "") statement = match.group("query").replace("\\", "")
source = self._parse_address(match.group("client_ip")) source = match.group("client_ip").split(":")[0]
event_time = uuid.UUID(int=idx) event_time = uuid.UUID(int=idx)
t = self.named_tuple_factory(date, node, event_time, match.group("category"), match.group("cl"), match.group("error") == "true", match.group("keyspace"), statement, source, match.group("table"), match.group("username")) t = self.named_tuple_factory(date, node, event_time, match.group("category"), match.group("cl"), match.group("error") == "true", match.group("keyspace"), statement, source, match.group("table"), match.group("username"))
return t return t
@@ -594,7 +582,6 @@ class CQLAuditTester(AuditTester):
user="anonymous", user="anonymous",
cl="ONE", cl="ONE",
error=False, error=False,
source="127.0.0.1",
): ):
self.assert_audit_row_fields(row) self.assert_audit_row_fields(row)
assert row.node in self.server_addresses assert row.node in self.server_addresses
@@ -603,7 +590,7 @@ class CQLAuditTester(AuditTester):
assert row.error == error assert row.error == error
assert row.keyspace_name == ks assert row.keyspace_name == ks
assert row.operation == statement assert row.operation == statement
assert row.source == source assert row.source == "127.0.0.1"
assert row.table_name == table assert row.table_name == table
assert row.username == user assert row.username == user
@@ -827,7 +814,7 @@ class CQLAuditTester(AuditTester):
sorted_new_rows = sorted(new_rows, key=lambda row: (row.node, row.category, row.consistency, row.error, row.keyspace_name, row.operation, row.source, row.table_name, row.username)) sorted_new_rows = sorted(new_rows, key=lambda row: (row.node, row.category, row.consistency, row.error, row.keyspace_name, row.operation, row.source, row.table_name, row.username))
assert len(sorted_new_rows) == len(expected_entries) assert len(sorted_new_rows) == len(expected_entries)
for row, entry in zip(sorted_new_rows, sorted(expected_entries)): for row, entry in zip(sorted_new_rows, sorted(expected_entries)):
self.assert_audit_row_eq(row, entry.category, entry.statement, entry.table, entry.ks, entry.user, entry.cl, entry.error, entry.source) self.assert_audit_row_eq(row, entry.category, entry.statement, entry.table, entry.ks, entry.user, entry.cl, entry.error)
async def verify_keyspace(self, audit_settings=None, helper=None): async def verify_keyspace(self, audit_settings=None, helper=None):
""" """
@@ -1867,44 +1854,6 @@ class CQLAuditTester(AuditTester):
finally: finally:
session.execute("DROP KEYSPACE IF EXISTS kss") session.execute("DROP KEYSPACE IF EXISTS kss")
# Unix domain sockets have no IP peer address. Seastar's
# socket_address::addr() falls through to the default case for
# AF_UNIX and returns a zero-initialised in6_addr, i.e. "::".
MAINTENANCE_SOCKET_SOURCE = "::"
async def _test_audit_maintenance_socket_user_creation(self, manager, helper_class):
with helper_class() as helper:
session = await self.prepare(
user="cassandra", password="cassandra",
helper=helper,
audit_settings={**helper.audit_default_settings, "audit_categories": "DCL", "audit_keyspaces": ""},
create_keyspace=False,
)
servers = await manager.running_servers()
server = servers[0]
socket_path = await manager.server_get_maintenance_socket_path(server.server_id)
logger.info("Connecting to maintenance socket")
endpoint = UnixSocketEndPoint(socket_path)
maint_cluster = cluster_con([endpoint],
load_balancing_policy=WhiteListRoundRobinPolicy([endpoint]))
maint_session = maint_cluster.connect()
role_name = "audit_test_admin"
create_stmt = f"CREATE ROLE {role_name} WITH PASSWORD = 'secret' AND SUPERUSER = true AND LOGIN = true"
expected_operation = f"CREATE ROLE {role_name} WITH PASSWORD = '***' AND SUPERUSER = true AND LOGIN = true"
logger.info("Creating superuser via maintenance socket and verifying audit entry")
expected_entries = [AuditEntry(category="DCL", statement=expected_operation,
user="anonymous", table="", ks="", cl="LOCAL_QUORUM", error=False,
source=self.MAINTENANCE_SOCKET_SOURCE)]
with self.assert_entries_were_added(session, expected_entries):
maint_session.execute(create_stmt)
logger.info("Cleaning up created role")
maint_session.execute(f"DROP ROLE IF EXISTS {role_name}")
safe_driver_shutdown(maint_cluster)
# AuditBackendTable, no auth, rf=1 # AuditBackendTable, no auth, rf=1
@@ -1997,14 +1946,6 @@ async def test_service_level_statements_standalone(manager: ManagerClient):
await CQLAuditTester(manager)._test_service_level_statements() await CQLAuditTester(manager)._test_service_level_statements()
async def test_audit_maintenance_socket_user_creation(manager: ManagerClient):
"""Verify that creating a superuser via the maintenance socket is audited."""
t = CQLAuditTester(manager)
await t._test_audit_maintenance_socket_user_creation(manager, AuditBackendTable)
Syslog = functools.partial(AuditBackendSyslog, socket_path=syslog_socket_path)
await t._test_audit_maintenance_socket_user_creation(manager, Syslog)
# AuditBackendSyslog, no auth, rf=1 # AuditBackendSyslog, no auth, rf=1
async def test_audit_syslog_noauth(manager: ManagerClient): async def test_audit_syslog_noauth(manager: ManagerClient):

View File

@@ -48,5 +48,6 @@ run_in_dev:
- dtest/commitlog_test - dtest/commitlog_test
- dtest/cfid_test - dtest/cfid_test
- dtest/rebuild_test - dtest/rebuild_test
- dtest/wide_rows_test
run_in_debug: run_in_debug:
- random_failures/test_random_failures - random_failures/test_random_failures

View File

@@ -11,8 +11,7 @@ import pytest
from test.pylib.manager_client import ManagerClient from test.pylib.manager_client import ManagerClient
from test.pylib.scylla_cluster import ReplaceConfig from test.pylib.scylla_cluster import ReplaceConfig
from test.cluster.util import (check_token_ring_and_group0_consistency, wait_for_token_ring_and_group0_consistency, from test.cluster.util import (check_token_ring_and_group0_consistency, wait_for_token_ring_and_group0_consistency,
get_coordinator_host, get_coordinator_host_ids, wait_new_coordinator_elected, get_coordinator_host, get_coordinator_host_ids, wait_new_coordinator_elected)
wait_for_no_pending_topology_transition)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -20,7 +19,7 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio @pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout: int, scale_timeout: callable) -> None: async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout) -> None:
""" Kill coordinator with error injection while topology operation is running for cluster: decommission, """ Kill coordinator with error injection while topology operation is running for cluster: decommission,
bootstrap, removenode, replace. bootstrap, removenode, replace.
@@ -58,11 +57,9 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
logger.debug("Kill coordinator during decommission") logger.debug("Kill coordinator during decommission")
coordinator_host = await get_coordinator_host(manager) coordinator_host = await get_coordinator_host(manager)
other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id] other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id]
num_elections = len(await get_coordinator_host_ids(manager))
await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True) await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True)
await manager.decommission_node(server_id=other_nodes[-1].server_id, expected_error="Decommission failed. See earlier errors") await manager.decommission_node(server_id=other_nodes[-1].server_id, expected_error="Decommission failed. See earlier errors")
await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60)) await wait_new_coordinator_elected(manager, 2, time.time() + 60)
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
await manager.server_restart(coordinator_host.server_id, wait_others=1) await manager.server_restart(coordinator_host.server_id, wait_others=1)
await manager.servers_see_each_other(await manager.running_servers()) await manager.servers_see_each_other(await manager.running_servers())
await check_token_ring_and_group0_consistency(manager) await check_token_ring_and_group0_consistency(manager)
@@ -76,40 +73,33 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
node_to_remove_srv_id = other_nodes[-1].server_id node_to_remove_srv_id = other_nodes[-1].server_id
logger.debug("Stop node with srv_id %s", node_to_remove_srv_id) logger.debug("Stop node with srv_id %s", node_to_remove_srv_id)
await manager.server_stop_gracefully(node_to_remove_srv_id) await manager.server_stop_gracefully(node_to_remove_srv_id)
num_elections = len(await get_coordinator_host_ids(manager))
await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True) await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True)
logger.debug("Start removenode with srv_id %s from node with srv_id %s", node_to_remove_srv_id, working_srv_id) logger.debug("Start removenode with srv_id %s from node with srv_id %s", node_to_remove_srv_id, working_srv_id)
await manager.remove_node(working_srv_id, await manager.remove_node(working_srv_id,
node_to_remove_srv_id, node_to_remove_srv_id,
expected_error="Removenode failed. See earlier errors") expected_error="Removenode failed. See earlier errors")
await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60)) await wait_new_coordinator_elected(manager, 3, time.time() + 60)
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
await manager.others_not_see_server(server_ip=coordinator_host.ip_addr) await manager.others_not_see_server(server_ip=coordinator_host.ip_addr)
logger.debug("Start old coordinator node with srv_id %s", coordinator_host.server_id) logger.debug("Start old coordinator node with srv_id %s", coordinator_host.server_id)
await manager.server_restart(coordinator_host.server_id, wait_others=1) await manager.server_restart(coordinator_host.server_id, wait_others=1)
await manager.servers_see_each_other(await manager.running_servers()) await manager.servers_see_each_other(await manager.running_servers())
logger.debug("Remove node with srv_id %s from node with srv_id %s because it was banned in a previous attempt", node_to_remove_srv_id, working_srv_id) logger.debug("Remove node with srv_id %s from node with srv_id %s because it was banned in a previous attempt", node_to_remove_srv_id, working_srv_id)
await manager.remove_node(working_srv_id, node_to_remove_srv_id) await manager.remove_node(working_srv_id, node_to_remove_srv_id)
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
await manager.servers_see_each_other(await manager.running_servers())
await check_token_ring_and_group0_consistency(manager) await check_token_ring_and_group0_consistency(manager)
logger.debug("Restore number of nodes in cluster") logger.debug("Restore number of nodes in cluster")
await manager.server_add(config=config, cmdline=cmdline) await manager.server_add(cmdline=cmdline)
# kill coordinator during bootstrap # kill coordinator during bootstrap
logger.debug("Kill coordinator during bootstrap") logger.debug("Kill coordinator during bootstrap")
nodes = await manager.running_servers() nodes = await manager.running_servers()
coordinator_host = await get_coordinator_host(manager) coordinator_host = await get_coordinator_host(manager)
other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id] other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id]
new_node = await manager.server_add(start=False, config=config, cmdline=cmdline) new_node = await manager.server_add(start=False, cmdline=cmdline)
num_elections = len(await get_coordinator_host_ids(manager))
await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True) await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True)
await manager.server_start(new_node.server_id, await manager.server_start(new_node.server_id,
expected_error="Startup failed: std::runtime_error") expected_error="Startup failed: std::runtime_error")
await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60)) await wait_new_coordinator_elected(manager, 4, time.time() + 60)
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
await manager.server_restart(coordinator_host.server_id, wait_others=1) await manager.server_restart(coordinator_host.server_id, wait_others=1)
await manager.servers_see_each_other(await manager.running_servers()) await manager.servers_see_each_other(await manager.running_servers())
await check_token_ring_and_group0_consistency(manager) await check_token_ring_and_group0_consistency(manager)
@@ -121,13 +111,11 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id] other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id]
node_to_replace_srv_id = other_nodes[-1].server_id node_to_replace_srv_id = other_nodes[-1].server_id
await manager.server_stop_gracefully(node_to_replace_srv_id) await manager.server_stop_gracefully(node_to_replace_srv_id)
num_elections = len(await get_coordinator_host_ids(manager))
await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True) await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True)
replace_cfg = ReplaceConfig(replaced_id = node_to_replace_srv_id, reuse_ip_addr = False, use_host_id = True) replace_cfg = ReplaceConfig(replaced_id = node_to_replace_srv_id, reuse_ip_addr = False, use_host_id = True)
new_node = await manager.server_add(start=False, config=config, replace_cfg=replace_cfg, cmdline=cmdline) new_node = await manager.server_add(start=False, replace_cfg=replace_cfg, cmdline=cmdline)
await manager.server_start(new_node.server_id, expected_error="Replace failed. See earlier errors") await manager.server_start(new_node.server_id, expected_error="Replace failed. See earlier errors")
await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60)) await wait_new_coordinator_elected(manager, 5, time.time() + 60)
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
logger.debug("Start old coordinator node") logger.debug("Start old coordinator node")
await manager.others_not_see_server(server_ip=coordinator_host.ip_addr) await manager.others_not_see_server(server_ip=coordinator_host.ip_addr)
await manager.server_restart(coordinator_host.server_id, wait_others=1) await manager.server_restart(coordinator_host.server_id, wait_others=1)
@@ -135,5 +123,5 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
logger.debug("Replaced node is already non-voter and will be banned after restart. Remove it") logger.debug("Replaced node is already non-voter and will be banned after restart. Remove it")
coordinator_host = await get_coordinator_host(manager) coordinator_host = await get_coordinator_host(manager)
await manager.remove_node(coordinator_host.server_id, node_to_replace_srv_id) await manager.remove_node(coordinator_host.server_id, node_to_replace_srv_id)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + scale_timeout(60)) await wait_for_token_ring_and_group0_consistency(manager, time.time() + 60)
await check_token_ring_and_group0_consistency(manager) await check_token_ring_and_group0_consistency(manager)

View File

@@ -8,7 +8,7 @@ from test.pylib.manager_client import ManagerClient
from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair
from test.pylib.tablets import get_all_tablet_replicas from test.pylib.tablets import get_all_tablet_replicas
from test.cluster.tasks.task_manager_client import TaskManagerClient from test.cluster.tasks.task_manager_client import TaskManagerClient
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, ensure_group0_leader_on, new_test_keyspace, new_test_table, trigger_stepdown from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown
from test.pylib.util import wait_for_cql_and_get_hosts from test.pylib.util import wait_for_cql_and_get_hosts
from cassandra.query import ConsistencyLevel, SimpleStatement from cassandra.query import ConsistencyLevel, SimpleStatement
@@ -880,30 +880,41 @@ async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manage
# affected replica but process the UNREPAIRED sstable on the others, so the classification # affected replica but process the UNREPAIRED sstable on the others, so the classification
# divergence is never corrected. In tombstone scenarios this enables premature tombstone GC # divergence is never corrected. In tombstone scenarios this enables premature tombstone GC
# on the affected replica leading to data resurrection. # on the affected replica leading to data resurrection.
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
cmdline = ['--hinted-handoff-enabled', '0']
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
class _LeadershipTransferred(Exception): # Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
"""Raised when leadership transferred to servers[1] during the test, requiring a retry.""" # UNREPAIRED compaction view, making the race easy to trigger deterministically.
pass await cql.run_async(
f"ALTER TABLE {ks}.test WITH compaction = "
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
)
async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, token, scylla_path, current_key): # Disable autocompaction everywhere so we control exactly when compaction runs.
"""Core logic for test_incremental_repair_race_window_promotes_unrepaired_data. for s in servers:
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
scylla_path = await manager.server_get_exe(servers[0].server_id)
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
# S0'(repaired_at=1) on all nodes.
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# Insert keys 10-19 and flush on all nodes → S1(repaired_at=0).
# These will be the subject of repair 2.
repair2_keys = list(range(current_key, current_key + 10))
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
for s in servers:
await manager.api.flush_keyspace(s.ip_addr, ks)
current_key += 10
Returns the next current_key value.
Raises _LeadershipTransferred if servers[1] becomes coordinator after the
restart, signalling the caller to retry.
"""
# Ensure servers[1] is not the topology coordinator. If the coordinator is
# restarted, the Raft leader dies, a new election occurs, and the new
# coordinator re-initiates tablet repair -- flushing memtables on all replicas
# and marking post-repair data as repaired. That legitimate re-repair masks
# the compaction-merge bug this test detects.
coord = await get_topology_coordinator(manager) coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord) coord_serv = await find_server_by_host_id(manager, servers, coord)
if coord_serv == servers[1]:
other = next(s for s in servers if s != servers[1])
await ensure_group0_leader_on(manager, other)
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)
coord_log = await manager.server_open_log(coord_serv.server_id) coord_log = await manager.server_open_log(coord_serv.server_id)
coord_mark = await coord_log.mark() coord_mark = await coord_log.mark()
@@ -967,16 +978,6 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
await manager.server_start(target.server_id) await manager.server_start(target.server_id)
await manager.servers_see_each_other(servers) await manager.servers_see_each_other(servers)
# Check if leadership transferred to servers[1] during the restart.
# If so, the new coordinator will re-initiate repair, masking the bug.
new_coord = await get_topology_coordinator(manager)
new_coord_serv = await find_server_by_host_id(manager, servers, new_coord)
if new_coord_serv == servers[1]:
await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update")
await manager.api.wait_task(servers[0].ip_addr, task_id)
raise _LeadershipTransferred(
"servers[1] became topology coordinator after restart")
# Poll until compaction has produced F(repaired_at=2) containing post-repair keys, # Poll until compaction has produced F(repaired_at=2) containing post-repair keys,
# confirming that the bug was triggered (S1' and E merged during the race window). # confirming that the bug was triggered (S1' and E merged during the race window).
deadline = time.time() + 60 deadline = time.time() + 60
@@ -999,7 +1000,7 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
if not compaction_ran: if not compaction_ran:
logger.warning("Compaction did not merge S1' and E after restart during the race window; " logger.warning("Compaction did not merge S1' and E after restart during the race window; "
"the bug was not triggered. Skipping assertion.") "the bug was not triggered. Skipping assertion.")
return current_key return
# Flush servers[0] and servers[2] AFTER the race window closes so their post-repair # Flush servers[0] and servers[2] AFTER the race window closes so their post-repair
# keys land in G(repaired_at=0): correctly classified as UNREPAIRED. # keys land in G(repaired_at=0): correctly classified as UNREPAIRED.
@@ -1030,9 +1031,8 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, " f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, "
f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}") f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}")
# servers[0] and servers[2] were never restarted and the coordinator stayed # servers[0] and servers[2] flushed post-repair keys after the race window closed,
# alive throughout, so no re-repair could have flushed their memtables. # so those keys are in G(repaired_at=0) → correctly UNREPAIRED.
# Post-repair keys must NOT appear in repaired sstables on these servers.
assert not (repaired_keys_0 & post_repair_key_set), \ assert not (repaired_keys_0 & post_repair_key_set), \
f"servers[0] should not have post-repair keys in repaired sstables, " \ f"servers[0] should not have post-repair keys in repaired sstables, " \
f"got: {repaired_keys_0 & post_repair_key_set}" f"got: {repaired_keys_0 & post_repair_key_set}"
@@ -1053,54 +1053,6 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
f"on servers[1] after restart lost the being_repaired markers during the race window. " \ f"on servers[1] after restart lost the being_repaired markers during the race window. " \
f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \ f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \
f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}" f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"
return current_key
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
cmdline = ['--hinted-handoff-enabled', '0']
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
await cql.run_async(
f"ALTER TABLE {ks}.test WITH compaction = "
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
)
# Disable autocompaction everywhere so we control exactly when compaction runs.
for s in servers:
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
scylla_path = await manager.server_get_exe(servers[0].server_id)
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
# S0'(repaired_at=1) on all nodes.
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# Insert keys 10-19 and flush on all nodes -> S1(repaired_at=0).
# These will be the subject of repair 2.
repair2_keys = list(range(current_key, current_key + 10))
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
for s in servers:
await manager.api.flush_keyspace(s.ip_addr, ks)
current_key += 10
# If leadership transfers to servers[1] between our coordinator check and the
# restart, the coordinator change masks the bug. Detect and retry.
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
current_key = await _do_race_window_promotes_unrepaired_data(
manager, servers, cql, ks, token, scylla_path, current_key)
return
except _LeadershipTransferred as e:
logger.warning(f"Attempt {attempt}/{max_attempts}: {e}. Retrying.")
pytest.fail(f"Leadership kept transferring to servers[1] after {max_attempts} attempts; "
"could not run the test without coordinator interference.")
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
# Tombstone GC safety tests # Tombstone GC safety tests

View File

@@ -4,7 +4,7 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 # SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
# #
from test.pylib.manager_client import ManagerClient from test.pylib.manager_client import ManagerClient
from test.cluster.util import get_topology_coordinator, trigger_stepdown, new_test_keyspace, new_test_table from test.cluster.util import get_topology_coordinator, trigger_stepdown
import pytest import pytest
import logging import logging
@@ -83,78 +83,3 @@ async def test_load_stats_on_coordinator_failover(manager: ManagerClient):
coord3 = await get_topology_coordinator(manager) coord3 = await get_topology_coordinator(manager)
if coord3: if coord3:
break break
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_load_stats_refresh_during_shutdown(manager: ManagerClient):
"""Verify that _tablet_load_stats_refresh is properly joined during
topology coordinator shutdown, even when a schema change notification
triggers a refresh between run() completing and stop() being called.
Reproduces the scenario using two injection points:
- topology_coordinator_pause_before_stop: pauses after run() finishes
but before stop() is called
- refresh_tablet_load_stats_pause: holds refresh_tablet_load_stats()
so it's still in-flight during shutdown
Without the join in stop(), the refresh task outlives the coordinator
and accesses freed memory.
"""
servers = await manager.servers_add(3)
await manager.get_ready_cql(servers)
async with new_test_keyspace(manager,
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
coord = await get_topology_coordinator(manager)
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
coord_idx = host_ids.index(coord)
coord_server = servers[coord_idx]
log = await manager.server_open_log(coord_server.server_id)
mark = await log.mark()
# Injection B: pause between run() returning and stop() being called.
await manager.api.enable_injection(
coord_server.ip_addr, "topology_coordinator_pause_before_stop", one_shot=True)
# Stepdown causes the topology coordinator to abort and shut down.
logger.info("Triggering stepdown on coordinator")
await trigger_stepdown(manager, coord_server)
# Wait for injection B to fire. The coordinator has finished run() but
# the schema change listener is still registered.
mark, _ = await log.wait_for(
"topology_coordinator_pause_before_stop: waiting", from_mark=mark)
# Injection A: block refresh_tablet_load_stats() before it accesses _shared_tm.
# Enable it now so it only catches the notification-triggered call.
await manager.api.enable_injection(
coord_server.ip_addr, "refresh_tablet_load_stats_pause", one_shot=True)
# CREATE TABLE fires on_create_column_family on the old coordinator which
# fire-and-forgets _tablet_load_stats_refresh.trigger() scheduling a task
# via with_scheduling_group on the gossip scheduling group.
logger.info("Issuing CREATE TABLE while coordinator is paused before stop()")
async with new_test_table(manager, ks, "pk int PRIMARY KEY", reuse_tables=False):
# Wait for injection A: refresh_tablet_load_stats() is now blocked before
# accessing _shared_tm. The topology_coordinator is still alive (paused at B).
await log.wait_for("refresh_tablet_load_stats_pause: waiting", from_mark=mark)
# Release injection B: coordinator proceeds through stop().
# Without the fix, stop() returns quickly and run_topology_coordinator
# frees the topology_coordinator frame. With the fix, stop() blocks at
# _tablet_load_stats_refresh.join() until injection A is released.
logger.info("Releasing injection B: coordinator will stop")
await manager.api.message_injection(
coord_server.ip_addr, "topology_coordinator_pause_before_stop")
# Release injection A: refresh_tablet_load_stats() resumes and accesses
# this->_shared_tm via get_token_metadata_ptr(). Without the fix, 'this'
# points to freed memory and ASan detects heap-use-after-free.
logger.info("Releasing injection A: refresh resumes")
await manager.api.message_injection(
coord_server.ip_addr, "refresh_tablet_load_stats_pause")
# If the bug is present, the node crashed. read_barrier will fail.
await read_barrier(manager.api, coord_server.ip_addr)

View File

@@ -435,9 +435,8 @@ async def test_alter_tablets_rf_dc_drop(request: pytest.FixtureRequest, manager:
@pytest.mark.asyncio @pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest, manager: ManagerClient) -> None: async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str, host, ip_addr): async def get_replication_options(ks: str):
await read_barrier(manager.api, ip_addr) res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
repl = parse_replication_options(res[0].replication_v2 or res[0].replication) repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl return repl
@@ -452,44 +451,43 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
host_ids = [await manager.get_host_id(s.server_id) for s in servers] host_ids = [await manager.get_host_id(s.server_id) for s in servers]
cql = manager.get_cql() cql = manager.get_cql()
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);") await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1", host, servers[0].ip_addr) repl = await get_replication_options("ks1")
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
await cql.run_async("create keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 2} and tablets = {'initial': 4};") await cql.run_async("create keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 2} and tablets = {'initial': 4};")
await cql.run_async("create table ks2.t (pk int primary key);") await cql.run_async("create table ks2.t (pk int primary key);")
repl = await get_replication_options("ks2", host, servers[0].ip_addr) repl = await get_replication_options("ks2")
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
assert repl['dc2'] == '2' assert repl['dc2'] == '2'
await cql.run_async("create keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};") await cql.run_async("create keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
await cql.run_async("create table ks3.t (pk int primary key);") await cql.run_async("create table ks3.t (pk int primary key);")
repl = await get_replication_options("ks3", host, servers[0].ip_addr) repl = await get_replication_options("ks3")
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
await cql.run_async("create keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};") await cql.run_async("create keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
await cql.run_async("create table ks4.t (pk int primary key);") await cql.run_async("create table ks4.t (pk int primary key);")
repl = await get_replication_options("ks4", host, servers[0].ip_addr) repl = await get_replication_options("ks4")
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
await cql.run_async(f"create keyspace ks5 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks5 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks5.t (pk int primary key);") await cql.run_async("create table ks5.t (pk int primary key);")
repl = await get_replication_options("ks5", host, servers[0].ip_addr) repl = await get_replication_options("ks5")
assert repl['dc1'] == '2' assert repl['dc1'] == '2'
assert repl['dc2'] == '2' assert repl['dc2'] == '2'
await cql.run_async(f"create keyspace ks6 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks6 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks6.t (pk int primary key);") await cql.run_async("create table ks6.t (pk int primary key);")
repl = await get_replication_options("ks6", host, servers[0].ip_addr) repl = await get_replication_options("ks6")
assert repl['dc1'] == '2' assert repl['dc1'] == '2'
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers] [await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};") await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
repl = await get_replication_options("ks1", host, servers[0].ip_addr) repl = await get_replication_options("ks1")
assert repl['dc1'] == ['rack1b'] assert repl['dc1'] == ['rack1b']
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks1", "t") tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks1", "t")
@@ -499,7 +497,7 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
assert r.replicas[0][0] == host_ids[1] assert r.replicas[0][0] == host_ids[1]
await cql.run_async("alter keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a', 'rack2b']};") await cql.run_async("alter keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a', 'rack2b']};")
repl = await get_replication_options("ks2", host, servers[0].ip_addr) repl = await get_replication_options("ks2")
assert repl['dc1'] == ['rack1a'] assert repl['dc1'] == ['rack1a']
assert len(repl['dc2']) == 2 assert len(repl['dc2']) == 2
assert 'rack2a' in repl['dc2'] and 'rack2b' in repl['dc2'] assert 'rack2a' in repl['dc2'] and 'rack2b' in repl['dc2']
@@ -525,13 +523,13 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
pass pass
await cql.run_async("alter keyspace ks5 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b'], 'dc2' : 2};") await cql.run_async("alter keyspace ks5 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b'], 'dc2' : 2};")
repl = await get_replication_options("ks5", host, servers[0].ip_addr) repl = await get_replication_options("ks5")
assert len(repl['dc1']) == 2 assert len(repl['dc1']) == 2
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1'] assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
assert repl['dc2'] == '2' assert repl['dc2'] == '2'
await cql.run_async("alter keyspace ks6 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : 2, 'dc2' : ['rack2a']};") await cql.run_async("alter keyspace ks6 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : 2, 'dc2' : ['rack2a']};")
repl = await get_replication_options("ks6", host, servers[0].ip_addr) repl = await get_replication_options("ks6")
assert repl['dc1'] == '2' assert repl['dc1'] == '2'
assert len(repl['dc2']) == 1 assert len(repl['dc2']) == 1
assert repl['dc2'][0] == 'rack2a' assert repl['dc2'][0] == 'rack2a'
@@ -539,9 +537,8 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
@pytest.mark.asyncio @pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager: ManagerClient) -> None: async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str, host, ip_addr): async def get_replication_options(ks: str):
await read_barrier(manager.api, ip_addr) res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
repl = parse_replication_options(res[0].replication_v2 or res[0].replication) repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl return repl
@@ -554,11 +551,10 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc2', 'rack': 'rack2b'})] await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc2', 'rack': 'rack2b'})]
cql = manager.get_cql() cql = manager.get_cql()
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);") await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1", host, servers[0].ip_addr) repl = await get_replication_options("ks1")
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
await cql.run_async("CREATE KEYSPACE ksv WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2} AND tablets = {'enabled': false}") await cql.run_async("CREATE KEYSPACE ksv WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2} AND tablets = {'enabled': false}")
@@ -578,19 +574,19 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
servers = servers[0:-1] servers = servers[0:-1]
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};") await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
repl = await get_replication_options("ks1", host, servers[0].ip_addr) repl = await get_replication_options("ks1")
assert repl['dc1'] == ['rack1b'] assert repl['dc1'] == ['rack1b']
logging.info("Rolling restart") logging.info("Rolling restart")
await manager.rolling_restart(servers, wait_for_cql=True, cmdline_options_override=["--enforce-rack-list", "true", "--error-injections-at-startup", "[]", "--smp", "2"]) await manager.rolling_restart(servers, wait_for_cql=True, cmdline_options_override=["--enforce-rack-list", "true", "--error-injections-at-startup", "[]", "--smp", "2"])
await cql.run_async(f"create keyspace ks2 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks2 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
repl = await get_replication_options("ks2", host, servers[0].ip_addr) repl = await get_replication_options("ks2")
assert len(repl['dc1']) == 2 assert len(repl['dc1']) == 2
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1'] assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
await cql.run_async(f"create keyspace ks3 with replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks3 with replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': 4}};")
repl = await get_replication_options("ks3", host, servers[0].ip_addr) repl = await get_replication_options("ks3")
assert len(repl['dc1']) == 1 assert len(repl['dc1']) == 1
assert len(repl['dc2']) == 1 assert len(repl['dc2']) == 1
assert 'rack1a' in repl['dc1'] or 'rack1b' in repl['dc1'] assert 'rack1a' in repl['dc1'] or 'rack1b' in repl['dc1']
@@ -606,7 +602,7 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
assert failed assert failed
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b'], 'dc2': 1};") await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b'], 'dc2': 1};")
repl = await get_replication_options("ks1", host, servers[0].ip_addr) repl = await get_replication_options("ks1")
assert len(repl['dc1']) == 1 assert len(repl['dc1']) == 1
assert repl['dc1'][0] == 'rack1b' assert repl['dc1'][0] == 'rack1b'
assert len(repl['dc2']) == 1 assert len(repl['dc2']) == 1
@@ -1113,9 +1109,8 @@ async def test_multi_rf_increase_before_decrease_0_N(request: pytest.FixtureRequ
@pytest.mark.asyncio @pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureRequest, manager: ManagerClient) -> None: async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str, host, ip_addr): async def get_replication_options(ks: str):
await read_barrier(manager.api, ip_addr) res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
repl = parse_replication_options(res[0].replication_v2 or res[0].replication) repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl return repl
@@ -1133,11 +1128,10 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
host_ids = [await manager.get_host_id(s.server_id) for s in servers] host_ids = [await manager.get_host_id(s.server_id) for s in servers]
cql = manager.get_cql() cql = manager.get_cql()
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};") await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);") await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1", host, servers[0].ip_addr) repl = await get_replication_options("ks1")
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
[await manager.api.disable_injection(s.ip_addr, numeric_injection) for s in servers] [await manager.api.disable_injection(s.ip_addr, numeric_injection) for s in servers]
@@ -1171,7 +1165,7 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
failed = True failed = True
assert failed assert failed
repl = await get_replication_options("ks1", host, servers[0].ip_addr) repl = await get_replication_options("ks1")
assert repl['dc1'] == '1' assert repl['dc1'] == '1'
@pytest.mark.asyncio @pytest.mark.asyncio

View File

@@ -18,7 +18,7 @@ from cassandra.cluster import ConnectionException, ConsistencyLevel, NoHostAvail
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
from test.pylib.internal_types import ServerInfo, HostID from test.pylib.internal_types import ServerInfo, HostID
from test.pylib.manager_client import ManagerClient from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import HTTPError, get_host_api_address, read_barrier from test.pylib.rest_client import get_host_api_address, read_barrier
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, get_available_host, unique_name from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, get_available_host, unique_name
from typing import Optional, List, Union from typing import Optional, List, Union
@@ -119,42 +119,6 @@ async def check_token_ring_and_group0_consistency(manager: ManagerClient) -> Non
assert token_ring_ids == group0_ids assert token_ring_ids == group0_ids
async def wait_for_no_pending_topology_transition(manager: ManagerClient, deadline: float) -> None:
"""Wait until there is no pending topology transition.
Polls system.topology until the transition_state column is null,
indicating that the topology coordinator has finished processing the
current operation (whether it completed successfully or was rolled back).
"""
cql = manager.get_cql()
async def no_transition():
try:
host = await get_available_host(cql, deadline)
await read_barrier(manager.api, get_host_api_address(host))
rs = await cql.run_async(
"select transition_state from system.topology where key = 'topology'",
host=host)
except NoHostAvailable as e:
logger.info(f"Topology transition check failed, retrying: {e}")
return None
except ConnectionException as e:
logger.info(f"Topology transition check failed, retrying: {e}")
return None
except HTTPError as e:
logger.info(f"Read barrier failed, retrying: {e}")
return None
if not rs:
logger.warning(f"Topology transition not visible: system.topology row not found, retrying")
return None
if rs[0].transition_state is not None:
logger.warning(f"Topology transition still in progress: {rs[0].transition_state}")
return None
return True
await wait_for(no_transition, deadline, period=.5)
async def wait_for_token_ring_and_group0_consistency(manager: ManagerClient, deadline: float) -> None: async def wait_for_token_ring_and_group0_consistency(manager: ManagerClient, deadline: float) -> None:
""" """
Weaker version of the above check. Weaker version of the above check.
@@ -434,14 +398,13 @@ def get_uuid_from_str(string: str) -> str:
async def wait_new_coordinator_elected(manager: ManagerClient, expected_num_of_elections: int, deadline: float) -> None: async def wait_new_coordinator_elected(manager: ManagerClient, expected_num_of_elections: int, deadline: float) -> None:
"""Wait new coordinator to be elected """Wait new coordinator to be elected
Wait while the table 'system.group0_history' will have at least Wait while the table 'system.group0_history' will have a number of lines
expected_num_of_elections lines with 'new topology coordinator', with the 'new topology coordinator' equal to the expected_num_of_elections number,
and the latest host_id coordinator differs from the previous one. and the latest host_id coordinator differs from the previous one.
""" """
async def new_coordinator_elected(): async def new_coordinator_elected():
coordinators_ids = await get_coordinator_host_ids(manager) coordinators_ids = await get_coordinator_host_ids(manager)
logger.debug(f"Coordinators ids in history: {coordinators_ids}") if len(coordinators_ids) == expected_num_of_elections \
if len(coordinators_ids) >= expected_num_of_elections \
and coordinators_ids[0] != coordinators_ids[1]: and coordinators_ids[0] != coordinators_ids[1]:
return True return True
logger.warning("New coordinator was not elected %s", coordinators_ids) logger.warning("New coordinator was not elected %s", coordinators_ids)

View File

@@ -18,7 +18,6 @@
#include <seastar/testing/test_case.hh> #include <seastar/testing/test_case.hh>
#include "test/lib/exception_utils.hh" #include "test/lib/exception_utils.hh"
#include "test/lib/log.hh"
#include "test/lib/test_utils.hh" #include "test/lib/test_utils.hh"
#include "ldap_common.hh" #include "ldap_common.hh"
#include "service/migration_manager.hh" #include "service/migration_manager.hh"
@@ -682,41 +681,3 @@ SEASTAR_TEST_CASE(ldap_config) {
}, },
make_ldap_config()); make_ldap_config());
} }
// Reproduces the race between the cache pruner and the permission
// loader lifecycle during shutdown. Refs SCYLLADB-1679.
SEASTAR_TEST_CASE(ldap_pruner_no_crash_after_loader_cleared) {
auto cfg = make_ldap_config();
cfg->permissions_update_interval_in_ms.set(1);
auto call_count = seastar::make_lw_shared<int>(0);
co_await do_with_cql_env_thread([call_count](cql_test_env& env) {
auto& cache = env.auth_cache().local();
testlog.info("Populating 50 cache entries");
for (int i = 0; i < 50; i++) {
auto r = auth::make_data_resource("system", fmt::format("t{}", i));
cache.get_permissions(auth::role_or_anonymous(), r).get();
}
testlog.info("Installing slow permission loader (10ms per call)");
cache.set_permission_loader(
[call_count] (const auth::role_or_anonymous&, const auth::resource&)
-> seastar::future<auth::permission_set> {
++(*call_count);
co_await seastar::sleep(std::chrono::milliseconds(10));
co_return auth::permission_set();
});
testlog.info("Waiting for pruner to start reloading");
while (*call_count == 0) {
seastar::sleep(std::chrono::milliseconds(1)).get();
}
testlog.info("Pruner started, letting teardown run");
}, cfg);
testlog.info("Loader called {} times", *call_count);
}

View File

@@ -371,13 +371,9 @@ int scylla_simple_query_main(int argc, char** argv) {
audit::audit::start_audit(env.local_db().get_config(), env.get_shared_token_metadata(), env.qp(), env.migration_manager()).handle_exception([&] (auto&& e) { audit::audit::start_audit(env.local_db().get_config(), env.get_shared_token_metadata(), env.qp(), env.migration_manager()).handle_exception([&] (auto&& e) {
fmt::print("audit start failed: {}", e); fmt::print("audit start failed: {}", e);
}).get(); }).get();
audit::audit::start_storage(env.local_db().get_config()).get();
auto audit_stop = defer([] { auto audit_stop = defer([] {
audit::audit::stop_audit().get(); audit::audit::stop_audit().get();
}); });
auto audit_storage_stop = defer([] {
audit::audit::stop_storage().get();
});
auto results = do_cql_test(env, cfg); auto results = do_cql_test(env, cfg);
aggregated_perf_results agg(results); aggregated_perf_results agg(results);
std::cout << agg << std::endl; std::cout << agg << std::endl;

View File

@@ -72,12 +72,16 @@ class CppFile(pytest.File, ABC):
self.test_name = self.path.stem self.test_name = self.path.stem
# Implement following properties as cached_property because they are read-only, and based on stash items which # Implement following properties as cached_property because they are read-only, and based on stash items which
# will be assigned in test/pylib/runner.py::pytest_collect_file() and modify_pytest_item() after instance creation. # will be assigned in test/pylib/runner.py::pytest_collect_file() hook after a CppFile instance was created.
@cached_property @cached_property
def build_mode(self) -> str: def build_mode(self) -> str:
return self.stash[BUILD_MODE] return self.stash[BUILD_MODE]
@cached_property
def run_id(self) -> int:
return self.stash[RUN_ID]
@cached_property @cached_property
def suite_config(self) -> dict[str, Any]: def suite_config(self) -> dict[str, Any]:
return self.stash[TEST_SUITE].cfg return self.stash[TEST_SUITE].cfg
@@ -162,13 +166,9 @@ class CppTestCase(pytest.Item):
self.own_markers = [getattr(pytest.mark, mark_name) for mark_name in own_markers] self.own_markers = [getattr(pytest.mark, mark_name) for mark_name in own_markers]
self.add_marker(pytest.mark.cpp) self.add_marker(pytest.mark.cpp)
@cached_property
def run_id(self) -> int:
return self.stash[RUN_ID]
def get_artifact_path(self, extra: str = "", suffix: str = "") -> pathlib.Path: def get_artifact_path(self, extra: str = "", suffix: str = "") -> pathlib.Path:
return self.parent.log_dir / ".".join( return self.parent.log_dir / ".".join(
(self.path.relative_to(TEST_DIR).with_suffix("") / f"{self.name}{extra}.{self.run_id}{suffix}").parts (self.path.relative_to(TEST_DIR).with_suffix("") / f"{self.name}{extra}.{self.parent.run_id}{suffix}").parts
) )
def make_testpy_test_object_mock(self) -> SimpleNamespace: def make_testpy_test_object_mock(self) -> SimpleNamespace:
@@ -179,7 +179,7 @@ class CppTestCase(pytest.Item):
return SimpleNamespace( return SimpleNamespace(
time_end=0, time_end=0,
time_start=0, time_start=0,
id=self.run_id, id=self.parent.run_id,
mode=self.parent.build_mode, mode=self.parent.build_mode,
success=False, success=False,
shortname=self.name, shortname=self.name,

View File

@@ -15,7 +15,7 @@ import random
import sys import sys
from argparse import BooleanOptionalAction from argparse import BooleanOptionalAction
from collections import defaultdict from collections import defaultdict
from itertools import chain, count from itertools import chain, count, product
from functools import cache, cached_property from functools import cache, cached_property
from pathlib import Path from pathlib import Path
from random import randint from random import randint
@@ -106,7 +106,7 @@ def pytest_addoption(parser: pytest.Parser) -> None:
" '--logger-log-level raft=trace --default-log-level error'") " '--logger-log-level raft=trace --default-log-level error'")
parser.addoption('--x-log2-compaction-groups', action="store", default="0", type=int, parser.addoption('--x-log2-compaction-groups', action="store", default="0", type=int,
help="Controls number of compaction groups to be used by Scylla tests. Value of 3 implies 8 groups.") help="Controls number of compaction groups to be used by Scylla tests. Value of 3 implies 8 groups.")
parser.addoption('--repeat', action="store", default=1, type=int, parser.addoption('--repeat', action="store", default="1", type=int,
help="number of times to repeat test execution") help="number of times to repeat test execution")
# Pass information about Scylla node from test.py to pytest. # Pass information about Scylla node from test.py to pytest.
@@ -162,10 +162,9 @@ def scylla_binary(testpy_test) -> Path:
return testpy_test.suite.scylla_exe return testpy_test.suite.scylla_exe
def pytest_collection_modifyitems(items: list[pytest.Item], config: pytest.Config) -> None: def pytest_collection_modifyitems(items: list[pytest.Item]) -> None:
run_ids = defaultdict(lambda: count(start=int(config.getoption("--run_id") or 1)))
for item in items: for item in items:
modify_pytest_item(item=item, run_ids=run_ids) modify_pytest_item(item=item)
suites_order = defaultdict(count().__next__) # number suites in order of appearance suites_order = defaultdict(count().__next__) # number suites in order of appearance
@@ -286,10 +285,7 @@ def pytest_configure(config: pytest.Config) -> None:
pytest_log_dir.mkdir(parents=True, exist_ok=True) pytest_log_dir.mkdir(parents=True, exist_ok=True)
if not _pytest_config.getoption("--save-log-on-success"): if not _pytest_config.getoption("--save-log-on-success"):
for file in pytest_log_dir.glob("*"): for file in pytest_log_dir.glob("*"):
# This will help in case framework tests are executed with test.py event if it's the wrong way to run them. file.unlink()
# test_no_bare_skip_markers_in_collection uses a subprocess to run a collection that has lead to race
# condition, especially with repeat.
file.unlink(missing_ok=True)
_pytest_config.stash[PYTEST_LOG_FILE] = f"{pytest_log_dir}/pytest_main_{HOST_ID}.log" _pytest_config.stash[PYTEST_LOG_FILE] = f"{pytest_log_dir}/pytest_main_{HOST_ID}.log"
@@ -317,15 +313,14 @@ def pytest_configure(config: pytest.Config) -> None:
os.environ["TOPOLOGY_RANDOM_FAILURES_TEST_SHUFFLE_SEED"] = os.environ.get("TOPOLOGY_RANDOM_FAILURES_TEST_SHUFFLE_SEED", str(random.randint(0, sys.maxsize))) os.environ["TOPOLOGY_RANDOM_FAILURES_TEST_SHUFFLE_SEED"] = os.environ.get("TOPOLOGY_RANDOM_FAILURES_TEST_SHUFFLE_SEED", str(random.randint(0, sys.maxsize)))
config.build_modes = get_modes_to_run(config) config.build_modes = get_modes_to_run(config)
repeat = int(config.getoption("--repeat"))
if testpy_run_id := config.getoption("--run_id"): if testpy_run_id := config.getoption("--run_id"):
if config.getoption("--repeat") != 1: if repeat != 1:
raise RuntimeError("Can't use --run_id and --repeat simultaneously.") raise RuntimeError("Can't use --run_id and --repeat simultaneously.")
config.run_ids = (testpy_run_id,)
else:
class DisabledFile(pytest.File): config.run_ids = tuple(range(1, repeat + 1))
def collect(self) -> list[pytest.Item]:
pytest.skip("All tests in this file are disabled in requested modes according to the suite config.")
@pytest.hookimpl(wrapper=True) @pytest.hookimpl(wrapper=True)
@@ -342,16 +337,19 @@ def pytest_collect_file(file_path: pathlib.Path,
mode for mode in build_modes mode for mode in build_modes
if not suite_config.is_test_disabled(build_mode=mode, path=file_path) if not suite_config.is_test_disabled(build_mode=mode, path=file_path)
) )
if repeats := [mode for mode in build_modes for _ in range(parent.config.getoption("--repeat"))]: repeats = list(product(build_modes, parent.config.run_ids))
ihook = parent.ihook
collectors = list(chain(collectors, chain.from_iterable( if not repeats:
ihook.pytest_collect_file(file_path=file_path, parent=parent) for _ in range(1, len(repeats)) return []
)))
for build_mode, collector in zip(repeats, collectors, strict=True): ihook = parent.ihook
collector.stash[BUILD_MODE] = build_mode collectors = list(chain(collectors, chain.from_iterable(
collector.stash[TEST_SUITE] = suite_config ihook.pytest_collect_file(file_path=file_path, parent=parent) for _ in range(1, len(repeats))
else: )))
collectors = [DisabledFile.from_parent(parent=parent, path=file_path)] for (build_mode, run_id), collector in zip(repeats, collectors, strict=True):
collector.stash[BUILD_MODE] = build_mode
collector.stash[RUN_ID] = run_id
collector.stash[TEST_SUITE] = suite_config
parent.stash[REPEATING_FILES].remove(file_path) parent.stash[REPEATING_FILES].remove(file_path)
@@ -425,7 +423,7 @@ class TestSuiteConfig:
TEST_SUITE = pytest.StashKey[TestSuiteConfig | None]() TEST_SUITE = pytest.StashKey[TestSuiteConfig | None]()
_STASH_KEYS_TO_COPY = BUILD_MODE, TEST_SUITE _STASH_KEYS_TO_COPY = BUILD_MODE, RUN_ID, TEST_SUITE
def get_params_stash(node: _pytest.nodes.Node) -> pytest.Stash | None: def get_params_stash(node: _pytest.nodes.Node) -> pytest.Stash | None:
@@ -435,12 +433,11 @@ def get_params_stash(node: _pytest.nodes.Node) -> pytest.Stash | None:
return parent.stash return parent.stash
def modify_pytest_item(item: pytest.Item, run_ids: defaultdict[tuple[str, str], count]) -> None: def modify_pytest_item(item: pytest.Item) -> None:
params_stash = get_params_stash(node=item) params_stash = get_params_stash(node=item)
for key in _STASH_KEYS_TO_COPY: for key in _STASH_KEYS_TO_COPY:
item.stash[key] = params_stash[key] item.stash[key] = params_stash[key]
item.stash[RUN_ID] = next(run_ids[(item.stash[BUILD_MODE], item._nodeid)])
suffix = f".{item.stash[BUILD_MODE]}.{item.stash[RUN_ID]}" suffix = f".{item.stash[BUILD_MODE]}.{item.stash[RUN_ID]}"

View File

@@ -75,7 +75,6 @@ def test_no_bare_skip_markers_in_collection():
"--collect-only", "--collect-only",
"--ignore=boost", "--ignore=raft", "--ignore=boost", "--ignore=raft",
"--ignore=ldap", "--ignore=vector_search", "--ignore=ldap", "--ignore=vector_search",
"--ignore=unit",
"-p", "no:sugar"], "-p", "no:sugar"],
capture_output=True, text=True, capture_output=True, text=True,
cwd=str(_TEST_ROOT), cwd=str(_TEST_ROOT),