mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-30 13:17:01 +00:00
Compare commits
13 Commits
next
...
migrate-wi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
211eb7c32b | ||
|
|
3a640a9ff0 | ||
|
|
8288f87beb | ||
|
|
77c03354f4 | ||
|
|
49df9242f7 | ||
|
|
8c82c6646b | ||
|
|
d76d0b8a16 | ||
|
|
690672e4cb | ||
|
|
85079d7c7a | ||
|
|
70f8fcbe67 | ||
|
|
1d6403ddad | ||
|
|
9a24be2fe9 | ||
|
|
5c93ccb6d8 |
@@ -194,36 +194,22 @@ future<> audit::start_audit(const db::config& cfg, sharded<locator::shared_token
|
||||
std::move(audited_keyspaces),
|
||||
std::move(audited_tables),
|
||||
std::move(audited_categories),
|
||||
std::cref(cfg));
|
||||
}
|
||||
|
||||
future<> audit::start_storage(const db::config& cfg) {
|
||||
if (!audit_instance().local_is_initialized()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return audit_instance().invoke_on_all([&cfg] (audit& local_audit) {
|
||||
return local_audit._storage_helper_ptr->start(cfg).then([&local_audit] {
|
||||
local_audit._storage_running = true;
|
||||
std::cref(cfg))
|
||||
.then([&cfg] {
|
||||
if (!audit_instance().local_is_initialized()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return audit_instance().invoke_on_all([&cfg] (audit& local_audit) {
|
||||
return local_audit.start(cfg);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> audit::stop_storage() {
|
||||
if (!audit_instance().local_is_initialized()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return audit_instance().invoke_on_all([] (audit& local_audit) {
|
||||
local_audit._storage_running = false;
|
||||
return local_audit._storage_helper_ptr->stop();
|
||||
});
|
||||
}
|
||||
|
||||
future<> audit::stop_audit() {
|
||||
if (!audit_instance().local_is_initialized()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return audit::audit::audit_instance().invoke_on_all([] (auto& local_audit) {
|
||||
SCYLLA_ASSERT(!local_audit._storage_running);
|
||||
return local_audit.shutdown();
|
||||
}).then([] {
|
||||
return audit::audit::audit_instance().stop();
|
||||
@@ -237,6 +223,14 @@ audit_info_ptr audit::create_audit_info(statement_category cat, const sstring& k
|
||||
return std::make_unique<audit_info>(cat, keyspace, table, batch);
|
||||
}
|
||||
|
||||
future<> audit::start(const db::config& cfg) {
|
||||
return _storage_helper_ptr->start(cfg);
|
||||
}
|
||||
|
||||
future<> audit::stop() {
|
||||
return _storage_helper_ptr->stop();
|
||||
}
|
||||
|
||||
future<> audit::shutdown() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -247,12 +241,6 @@ future<> audit::log(const audit_info& audit_info, const service::client_state& c
|
||||
const sstring& username = client_state.user() ? client_state.user()->name.value_or(anonymous_username) : no_username;
|
||||
socket_address client_ip = client_state.get_client_address().addr();
|
||||
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
|
||||
if (!_storage_running) {
|
||||
on_internal_error_noexcept(logger, fmt::format("Audit log dropped (storage not ready): node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}",
|
||||
node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(),
|
||||
audit_info.query(), client_ip, audit_info.table(), username));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
if (logger.is_enabled(logging::log_level::debug)) {
|
||||
logger.debug("Log written: node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}",
|
||||
node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(),
|
||||
@@ -298,11 +286,6 @@ future<> inspect(const audit_info_alternator& ai, const service::client_state& c
|
||||
|
||||
future<> audit::log_login(const sstring& username, socket_address client_ip, bool error) noexcept {
|
||||
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
|
||||
if (!_storage_running) {
|
||||
on_internal_error_noexcept(logger, fmt::format("Audit login log dropped (storage not ready): node_ip {} client_ip {} username {} error {}",
|
||||
node_ip, client_ip, username, error ? "true" : "false"));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
if (logger.is_enabled(logging::log_level::debug)) {
|
||||
logger.debug("Login log written: node_ip {}, client_ip {}, username {}, error {}",
|
||||
node_ip, client_ip, username, error ? "true" : "false");
|
||||
|
||||
@@ -141,7 +141,6 @@ private:
|
||||
category_set _audited_categories;
|
||||
|
||||
std::unique_ptr<storage_helper> _storage_helper_ptr;
|
||||
bool _storage_running = false;
|
||||
|
||||
const db::config& _cfg;
|
||||
utils::observer<sstring> _cfg_keyspaces_observer;
|
||||
@@ -164,8 +163,6 @@ public:
|
||||
return audit_instance().local();
|
||||
}
|
||||
static future<> start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm);
|
||||
static future<> start_storage(const db::config& cfg);
|
||||
static future<> stop_storage();
|
||||
static future<> stop_audit();
|
||||
static audit_info_ptr create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table, bool batch = false);
|
||||
audit(locator::shared_token_metadata& stm,
|
||||
@@ -177,6 +174,8 @@ public:
|
||||
category_set&& audited_categories,
|
||||
const db::config& cfg);
|
||||
~audit();
|
||||
future<> start(const db::config& cfg);
|
||||
future<> stop();
|
||||
future<> shutdown();
|
||||
bool should_log(const audit_info& audit_info) const;
|
||||
bool will_log(statement_category cat, std::string_view keyspace = {}, std::string_view table = {}) const;
|
||||
|
||||
@@ -258,11 +258,13 @@ future<> ldap_role_manager::start() {
|
||||
} catch (const seastar::sleep_aborted&) {
|
||||
co_return; // ignore
|
||||
}
|
||||
try {
|
||||
co_await _cache.reload_all_permissions();
|
||||
} catch (...) {
|
||||
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
|
||||
}
|
||||
co_await _cache.container().invoke_on_all([] (cache& c) -> future<> {
|
||||
try {
|
||||
co_await c.reload_all_permissions();
|
||||
} catch (...) {
|
||||
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
return _std_mgr.start();
|
||||
|
||||
@@ -157,20 +157,6 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
|
||||
return create_legacy_keyspace_if_missing(mm);
|
||||
});
|
||||
}
|
||||
// Authorizer must be started before the permission loader is set,
|
||||
// because the loader calls _authorizer->authorize().
|
||||
// The loader must be set before starting the role manager, because
|
||||
// LDAP role manager starts a pruner fiber that calls
|
||||
// reload_all_permissions() which asserts _permission_loader is set.
|
||||
co_await _authorizer->start();
|
||||
if (!_used_by_maintenance_socket) {
|
||||
// Maintenance socket mode can't cache permissions because it has
|
||||
// different authorizer. We can't mix cached permissions, they could be
|
||||
// different in normal mode.
|
||||
_cache.set_permission_loader(std::bind(
|
||||
&service::get_uncached_permissions,
|
||||
this, std::placeholders::_1, std::placeholders::_2));
|
||||
}
|
||||
co_await _role_manager->start();
|
||||
if (this_shard_id() == 0) {
|
||||
// Role manager and password authenticator have this odd startup
|
||||
@@ -179,19 +165,21 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
|
||||
// creation therefore we need to wait here.
|
||||
co_await _role_manager->ensure_superuser_is_created();
|
||||
}
|
||||
// Authenticator must be started after ensure_superuser_is_created()
|
||||
// because password_authenticator queries system.roles for the
|
||||
// superuser entry created by the role manager.
|
||||
co_await _authenticator->start();
|
||||
co_await when_all_succeed(_authorizer->start(), _authenticator->start()).discard_result();
|
||||
if (!_used_by_maintenance_socket) {
|
||||
// Maintenance socket mode can't cache permissions because it has
|
||||
// different authorizer. We can't mix cached permissions, they could be
|
||||
// different in normal mode.
|
||||
_cache.set_permission_loader(std::bind(
|
||||
&service::get_uncached_permissions,
|
||||
this, std::placeholders::_1, std::placeholders::_2));
|
||||
}
|
||||
}
|
||||
|
||||
future<> service::stop() {
|
||||
_as.request_abort();
|
||||
// Reverse of start() order.
|
||||
co_await _authenticator->stop();
|
||||
co_await _role_manager->stop();
|
||||
_cache.set_permission_loader(nullptr);
|
||||
co_await _authorizer->stop();
|
||||
return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop()).discard_result();
|
||||
}
|
||||
|
||||
future<> service::ensure_superuser_is_created() {
|
||||
|
||||
@@ -593,7 +593,6 @@ scylla_tests = set([
|
||||
'test/boost/linearizing_input_stream_test',
|
||||
'test/boost/lister_test',
|
||||
'test/boost/locator_topology_test',
|
||||
'test/boost/lock_tables_metadata_test',
|
||||
'test/boost/log_heap_test',
|
||||
'test/boost/logalloc_standard_allocator_segment_pool_backend_test',
|
||||
'test/boost/logalloc_test',
|
||||
@@ -1711,7 +1710,7 @@ deps['test/boost/combined_tests'] += [
|
||||
'test/boost/sstable_compression_config_test.cc',
|
||||
'test/boost/sstable_directory_test.cc',
|
||||
'test/boost/sstable_set_test.cc',
|
||||
'test/boost/sstable_tablet_streaming_test.cc',
|
||||
'test/boost/sstable_tablet_streaming.cc',
|
||||
'test/boost/statement_restrictions_test.cc',
|
||||
'test/boost/storage_proxy_test.cc',
|
||||
'test/boost/tablets_test.cc',
|
||||
|
||||
@@ -399,10 +399,9 @@ future<> gossiper::do_send_ack2_msg(locator::host_id from, utils::chunked_vector
|
||||
}
|
||||
}
|
||||
gms::gossip_digest_ack2 ack2_msg(std::move(delta_ep_state_map));
|
||||
auto ack2_msg_str = fmt::format("{}", ack2_msg);
|
||||
logger.debug("Calling do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg_str);
|
||||
logger.debug("Calling do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg);
|
||||
co_await ser::gossip_rpc_verbs::send_gossip_digest_ack2(&_messaging, from, std::move(ack2_msg));
|
||||
logger.debug("finished do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg_str);
|
||||
logger.debug("finished do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg);
|
||||
}
|
||||
|
||||
// Depends on
|
||||
|
||||
46
main.cc
46
main.cc
@@ -1810,18 +1810,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
utils::get_local_injector().inject("stop_after_starting_migration_manager",
|
||||
[] { std::raise(SIGSTOP); });
|
||||
|
||||
// Audit must be constructed before the maintenance socket so
|
||||
// that on shutdown (reverse destruction order) the audit service
|
||||
// outlives the maintenance socket and in-flight queries can
|
||||
// still reach audit::inspect() safely.
|
||||
checkpoint(stop_signal, "starting audit service");
|
||||
audit::audit::start_audit(*cfg, token_metadata, qp, mm).handle_exception([&] (auto&& e) {
|
||||
startlog.error("audit start failed: {}", e);
|
||||
}).get();
|
||||
auto audit_stop = defer([] {
|
||||
audit::audit::stop_audit().get();
|
||||
});
|
||||
|
||||
// XXX: stop_raft has to happen before query_processor and migration_manager
|
||||
// is stopped, since some groups keep using the query
|
||||
// processor until are stopped inside stop_raft.
|
||||
@@ -2352,22 +2340,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
}).get();
|
||||
stop_signal.ready(false);
|
||||
|
||||
// At this point, `locator::topology` should be stable, i.e. we should have complete information
|
||||
// about the layout of the cluster (= list of nodes along with the racks/DCs).
|
||||
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
|
||||
db.local().check_rf_rack_validity(token_metadata.local().get());
|
||||
|
||||
startlog.info("Verifying that all of the tablet keyspaces use rack list replication factors");
|
||||
db.local().check_rack_list_everywhere(cfg->enforce_rack_list());
|
||||
|
||||
// The table-based audit backend needs Raft (via join_cluster)
|
||||
// to create its keyspace and table.
|
||||
checkpoint(stop_signal, "starting audit storage");
|
||||
audit::audit::start_storage(*cfg).get();
|
||||
auto audit_storage_stop = defer([] {
|
||||
audit::audit::stop_storage().get();
|
||||
});
|
||||
|
||||
if (cfg->maintenance_socket() != "ignore") {
|
||||
// Enable role operations now that node joined the cluster
|
||||
maintenance_auth_service.invoke_on_all([](auth::service& svc) {
|
||||
@@ -2377,6 +2349,24 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
start_cql(*cql_maintenance_server_ctl, stop_maintenance_cql, "maintenance native server");
|
||||
}
|
||||
|
||||
// At this point, `locator::topology` should be stable, i.e. we should have complete information
|
||||
// about the layout of the cluster (= list of nodes along with the racks/DCs).
|
||||
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
|
||||
db.local().check_rf_rack_validity(token_metadata.local().get());
|
||||
|
||||
startlog.info("Verifying that all of the tablet keyspaces use rack list replication factors");
|
||||
db.local().check_rack_list_everywhere(cfg->enforce_rack_list());
|
||||
|
||||
// Start audit service after join_cluster so that the table-based audit backend
|
||||
// can properly create its keyspace and table.
|
||||
checkpoint(stop_signal, "starting audit service");
|
||||
audit::audit::start_audit(*cfg, token_metadata, qp, mm).handle_exception([&] (auto&& e) {
|
||||
startlog.error("audit start failed: {}", e);
|
||||
}).get();
|
||||
auto audit_stop = defer([] {
|
||||
audit::audit::stop_audit().get();
|
||||
});
|
||||
|
||||
// Semantic validation of sstable compression parameters from config.
|
||||
// Adding here (i.e., after `join_cluster`) to ensure that the
|
||||
// required SSTABLE_COMPRESSION_DICTS cluster feature has been negotiated.
|
||||
|
||||
@@ -1328,27 +1328,9 @@ future<global_table_ptr> get_table_on_all_shards(sharded<database>& sharded_db,
|
||||
|
||||
future<tables_metadata_lock_on_all_shards> database::lock_tables_metadata(sharded<database>& sharded_db) {
|
||||
tables_metadata_lock_on_all_shards locks;
|
||||
// Acquire write lock on shard 0 first, and then on the remaining shards.
|
||||
//
|
||||
// Parallel acquisition on all shards could deadlock when two
|
||||
// fibers call lock_tables_metadata() concurrently: parallel_for_each
|
||||
// sends SMP messages to all shards even when the local shard's lock
|
||||
// attempt blocks. If task reordering (SEASTAR_SHUFFLE_TASK_QUEUE in
|
||||
// debug/sanitize builds) causes fiber A to win on shard X while
|
||||
// fiber B wins on shard Y, neither can make progress — classic
|
||||
// cross-shard lock-ordering deadlock.
|
||||
//
|
||||
// Acquiring the write lock on shard 0 first, and then on the remaining
|
||||
// shards, eliminates this: whichever fiber acquires shard 0 first is
|
||||
// guaranteed to acquire locks on all other shards before the other fiber
|
||||
// can acquire the lock on shard 0.
|
||||
co_await sharded_db.invoke_on(0, [&locks, &sharded_db] (auto& db) -> future<> {
|
||||
co_await sharded_db.invoke_on_all([&] (auto& db) -> future<> {
|
||||
locks.assign_lock(co_await db.get_tables_metadata().hold_write_lock());
|
||||
co_await sharded_db.invoke_on_others([&locks] (auto& db) -> future<> {
|
||||
locks.assign_lock(co_await db.get_tables_metadata().hold_write_lock());
|
||||
});
|
||||
});
|
||||
|
||||
co_return locks;
|
||||
}
|
||||
|
||||
|
||||
@@ -5466,9 +5466,10 @@ class scylla_compaction_tasks(gdb.Command):
|
||||
try:
|
||||
task_list = list(intrusive_list(cm['_tasks']))
|
||||
except gdb.error: # 6.2 compatibility
|
||||
task_list = [seastar_shared_ptr(t).get().dereference() for t in std_list(cm['_tasks'])]
|
||||
task_list = list(std_list(cm['_tasks']))
|
||||
|
||||
for task in task_list:
|
||||
task = seastar_shared_ptr(task).get().dereference()
|
||||
schema = schema_ptr(task['_compacting_table'].dereference()['_schema'])
|
||||
key = 'type={}, state={:5}, {}'.format(task['_type'], str(task['_state']), schema.table_name())
|
||||
task_hist.add(key)
|
||||
|
||||
@@ -438,10 +438,9 @@ static future<cql3::untyped_result_set> do_execute_cql_with_timeout(sstring req,
|
||||
|
||||
const auto cache_key = qp.compute_id(req, "", cql3::internal_dialect());
|
||||
auto ps_ptr = qp.get_prepared(cache_key);
|
||||
shared_ptr<cql_transport::messages::result_message::prepared> prepared_msg;
|
||||
if (!ps_ptr) {
|
||||
prepared_msg = co_await qp.prepare(req, qs, cql3::internal_dialect());
|
||||
ps_ptr = prepared_msg->get_prepared();
|
||||
const auto msg_ptr = co_await qp.prepare(req, qs, cql3::internal_dialect());
|
||||
ps_ptr = msg_ptr->get_prepared();
|
||||
if (!ps_ptr) {
|
||||
on_internal_error(paxos_state::logger, "prepared statement is null");
|
||||
}
|
||||
@@ -450,8 +449,8 @@ static future<cql3::untyped_result_set> do_execute_cql_with_timeout(sstring req,
|
||||
-1, service::node_local_only::yes);
|
||||
const auto st = ps_ptr->statement;
|
||||
|
||||
const auto result_ptr = co_await st->execute(qp, qs, qo, std::nullopt);
|
||||
co_return cql3::untyped_result_set(result_ptr);
|
||||
const auto msg_ptr = co_await st->execute(qp, qs, qo, std::nullopt);
|
||||
co_return cql3::untyped_result_set(msg_ptr);
|
||||
}
|
||||
|
||||
template <typename... Args>
|
||||
|
||||
@@ -4237,7 +4237,6 @@ public:
|
||||
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
|
||||
, _async_gate("topology_coordinator")
|
||||
{
|
||||
_lifecycle_notifier.register_subscriber(this);
|
||||
_db.get_notifier().register_listener(this);
|
||||
// When the delay_cdc_stream_finalization error injection is disabled
|
||||
// (test releases it), wake the topology coordinator so it retries
|
||||
@@ -4401,7 +4400,6 @@ future<bool> topology_coordinator::maybe_retry_failed_rf_change_tablet_rebuilds(
|
||||
}
|
||||
|
||||
future<> topology_coordinator::refresh_tablet_load_stats() {
|
||||
co_await utils::get_local_injector().inject("refresh_tablet_load_stats_pause", utils::wait_for_message(5min));
|
||||
auto tm = get_token_metadata_ptr();
|
||||
|
||||
locator::load_stats stats;
|
||||
@@ -4725,6 +4723,7 @@ future<> topology_coordinator::run() {
|
||||
|
||||
co_await _async_gate.close();
|
||||
co_await std::move(tablet_load_stats_refresher);
|
||||
co_await _tablet_load_stats_refresh.join();
|
||||
co_await std::move(cdc_generation_publisher);
|
||||
co_await std::move(cdc_streams_gc);
|
||||
co_await std::move(gossiper_orphan_remover);
|
||||
@@ -4737,8 +4736,6 @@ future<> topology_coordinator::stop() {
|
||||
co_await _db.get_notifier().unregister_listener(this);
|
||||
utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization");
|
||||
_topo_sm.on_tablet_split_ready = nullptr;
|
||||
co_await _lifecycle_notifier.unregister_subscriber(this);
|
||||
co_await _tablet_load_stats_refresh.join();
|
||||
|
||||
// if topology_coordinator::run() is aborted either because we are not a
|
||||
// leader anymore, or we are shutting down as a leader, we have to handle
|
||||
@@ -4800,6 +4797,7 @@ future<> run_topology_coordinator(
|
||||
topology_cmd_rpc_tracker};
|
||||
|
||||
std::exception_ptr ex;
|
||||
lifecycle_notifier.register_subscriber(&coordinator);
|
||||
try {
|
||||
rtlogger.info("start topology coordinator fiber");
|
||||
co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
|
||||
@@ -4820,7 +4818,7 @@ future<> run_topology_coordinator(
|
||||
}
|
||||
on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex));
|
||||
}
|
||||
co_await utils::get_local_injector().inject("topology_coordinator_pause_before_stop", utils::wait_for_message(5min));
|
||||
co_await lifecycle_notifier.unregister_subscriber(&coordinator);
|
||||
co_await coordinator.stop();
|
||||
}
|
||||
|
||||
|
||||
@@ -543,16 +543,11 @@ future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept {
|
||||
// during SSTable writing and removed before sealing. If the write
|
||||
// failed before sealing, the file may still be on disk and must be
|
||||
// cleaned up explicitly.
|
||||
// The component is only defined for the `ms` sstable format; for
|
||||
// older formats it is absent from the component map and looking up
|
||||
// its filename would throw std::out_of_range.
|
||||
// Use file_exists() to avoid a C++ exception on the common path
|
||||
// where the file was already removed before sealing.
|
||||
if (sstable_version_constants::get_component_map(sst.get_version()).contains(component_type::TemporaryHashes)) {
|
||||
auto temp_hashes = filename(sst, dir_name.native(), sst._generation, component_type::TemporaryHashes);
|
||||
if (co_await file_exists(temp_hashes)) {
|
||||
co_await sst.sstable_write_io_check(remove_file, std::move(temp_hashes));
|
||||
}
|
||||
auto temp_hashes = filename(sst, dir_name.native(), sst._generation, component_type::TemporaryHashes);
|
||||
if (co_await file_exists(temp_hashes)) {
|
||||
co_await sst.sstable_write_io_check(remove_file, std::move(temp_hashes));
|
||||
}
|
||||
if (sync) {
|
||||
co_await sst.sstable_write_io_check(sync_directory, dir_name.native());
|
||||
|
||||
@@ -135,23 +135,7 @@ future<> table_helper::cache_table_info(cql3::query_processor& qp, service::migr
|
||||
}
|
||||
|
||||
future<> table_helper::insert(cql3::query_processor& qp, service::migration_manager& mm, service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker) {
|
||||
// _prepared_stmt is a checked_weak_ptr into the prepared statements
|
||||
// cache and can be invalidated by a concurrent purge (e.g. on a schema
|
||||
// change). cache_table_info() (re-)prepares and assigns _prepared_stmt,
|
||||
// but the pin protecting the entry is dropped when try_prepare()
|
||||
// returns. In release the chain of ready-future co_awaits back to here
|
||||
// resumes synchronously, but debug builds preempt on every co_await
|
||||
// even for ready futures, opening a window for a purge to drop the
|
||||
// entry and leave _prepared_stmt null. Loop until a synchronous
|
||||
// post-resume check finds _prepared_stmt valid; nothing can run between
|
||||
// that check and the dereference below. _insert_stmt is a strong
|
||||
// shared_ptr and is not affected by cache invalidation.
|
||||
while (true) {
|
||||
co_await cache_table_info(qp, mm, qs);
|
||||
if (_prepared_stmt) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
co_await cache_table_info(qp, mm, qs);
|
||||
auto opts = opt_maker();
|
||||
opts.prepare(_prepared_stmt->bound_names);
|
||||
co_await _insert_stmt->execute(qp, qs, opts, std::nullopt);
|
||||
|
||||
76
test.py
76
test.py
@@ -11,11 +11,9 @@ from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import dataclasses
|
||||
import math
|
||||
import shlex
|
||||
import textwrap
|
||||
from bisect import insort
|
||||
from random import randint
|
||||
|
||||
import pytest
|
||||
@@ -185,8 +183,6 @@ def parse_cmd_line() -> argparse.Namespace:
|
||||
help="Specific byte limit for failure injection (random by default)")
|
||||
parser.add_argument('--skip-internet-dependent-tests', action="store_true",
|
||||
help="Skip tests which depend on artifacts from the internet.")
|
||||
parser.add_argument('--keep-duplicates', action='store_true', default=False,
|
||||
help="Do not deduplicate test arguments.")
|
||||
parser.add_argument("--pytest-arg", action='store', type=str,
|
||||
default=None, dest="pytest_arg",
|
||||
help="Additional command line arguments to pass to pytest, for example ./test.py --pytest-arg=\"-v -x\"")
|
||||
@@ -245,73 +241,6 @@ def parse_cmd_line() -> argparse.Namespace:
|
||||
return args
|
||||
|
||||
|
||||
# TODO: Remove _CollectionArgument and _deduplicate_test_args once we update
|
||||
# to pytest 9.x, which fixes argument deduplication:
|
||||
# https://github.com/pytest-dev/pytest/issues/12083
|
||||
@dataclasses.dataclass(frozen=True, order=True)
|
||||
class _CollectionArgument:
|
||||
"""Resolved collection argument for deduplication.
|
||||
|
||||
A version-independent subset of pytest's CollectionArgument that
|
||||
includes the fields needed for normalization (parametrization and
|
||||
original_index were added in pytest 9.0).
|
||||
|
||||
``a in b`` means ``b`` subsumes (contains) ``a``. Adapted from
|
||||
pytest 9.0.3 ``_pytest.main.is_collection_argument_subsumed_by``.
|
||||
"""
|
||||
path: pathlib.Path
|
||||
parts: tuple[str, ...]
|
||||
parametrization: str
|
||||
original_index: int
|
||||
|
||||
def __contains__(self, other: _CollectionArgument) -> bool:
|
||||
if self.path != other.path:
|
||||
return not self.parts and other.path.is_relative_to(self.path)
|
||||
if len(self.parts) > len(other.parts) or other.parts[:len(self.parts)] != self.parts:
|
||||
return False
|
||||
return not self.parametrization or self.parametrization == other.parametrization
|
||||
|
||||
|
||||
def _deduplicate_test_args(args: list[str]) -> list[str]:
|
||||
"""Remove duplicate and subsumed test arguments.
|
||||
|
||||
Resolves and normalizes CLI test arguments, then applies the normalization
|
||||
algorithm from pytest 9.0.3 to remove exact duplicates and arguments whose
|
||||
paths are contained within another argument's path.
|
||||
For example, ``["test/cql", "test/cql/lua_test.cql"]`` becomes ``["test/cql"]``.
|
||||
"""
|
||||
if not args:
|
||||
return args
|
||||
invocation_path = pathlib.Path.cwd()
|
||||
resolved_sorted: list[_CollectionArgument] = []
|
||||
unresolved_indices: set[int] = set()
|
||||
for i, arg in enumerate(args):
|
||||
# Adapted from pytest 9.0.3 _pytest.main.resolve_collection_argument.
|
||||
base, squacket, rest = arg.partition("[")
|
||||
strpath, *parts = base.split("::")
|
||||
fspath = pathlib.Path(os.path.abspath(invocation_path / strpath))
|
||||
if not fspath.exists():
|
||||
# Keep unresolved args — let pytest report the error.
|
||||
unresolved_indices.add(i)
|
||||
continue
|
||||
insort(resolved_sorted, _CollectionArgument(
|
||||
path=fspath,
|
||||
parts=tuple(parts),
|
||||
parametrization=squacket + rest,
|
||||
original_index=i,
|
||||
))
|
||||
|
||||
# Normalize: remove duplicates and subsumed arguments using an O(n log n)
|
||||
# sort-based algorithm adapted from pytest 9.0.3.
|
||||
normalized = resolved_sorted[:1]
|
||||
for ca in resolved_sorted[1:]:
|
||||
if ca not in normalized[-1]:
|
||||
normalized.append(ca)
|
||||
|
||||
kept_indices = {ca.original_index for ca in normalized} | unresolved_indices
|
||||
return [arg for i, arg in enumerate(args) if i in kept_indices]
|
||||
|
||||
|
||||
def run_pytest(options: argparse.Namespace) -> int:
|
||||
# When tests are executed in parallel on different hosts, we need to distinguish results from them.
|
||||
# So HOST_ID needed to not overwrite results from different hosts during Jenkins will copy to one directory.
|
||||
@@ -320,8 +249,7 @@ def run_pytest(options: argparse.Namespace) -> int:
|
||||
|
||||
report_dir = temp_dir / 'report'
|
||||
junit_output_file = report_dir / f'pytest_cpp_{HOST_ID}.xml'
|
||||
files_to_run = options.name if options.keep_duplicates else _deduplicate_test_args(options.name)
|
||||
files_to_run = files_to_run or [str(TOP_SRC_DIR / 'test/')]
|
||||
files_to_run = options.name or [str(TOP_SRC_DIR / 'test/')]
|
||||
args = [
|
||||
'--color=yes',
|
||||
f'--repeat={options.repeat}',
|
||||
@@ -341,8 +269,6 @@ def run_pytest(options: argparse.Namespace) -> int:
|
||||
])
|
||||
if options.verbose:
|
||||
args.append('-v')
|
||||
if options.keep_duplicates:
|
||||
args.append('--keep-duplicates')
|
||||
if options.quiet:
|
||||
args.append('--quiet')
|
||||
args.extend(['-p','no:sugar'])
|
||||
|
||||
@@ -150,8 +150,6 @@ add_scylla_test(lister_test
|
||||
KIND SEASTAR)
|
||||
add_scylla_test(locator_topology_test
|
||||
KIND SEASTAR)
|
||||
add_scylla_test(lock_tables_metadata_test
|
||||
KIND SEASTAR)
|
||||
add_scylla_test(log_heap_test
|
||||
KIND BOOST)
|
||||
add_scylla_test(logalloc_standard_allocator_segment_pool_backend_test
|
||||
@@ -376,7 +374,7 @@ add_scylla_test(combined_tests
|
||||
sstable_compression_config_test.cc
|
||||
sstable_directory_test.cc
|
||||
sstable_set_test.cc
|
||||
sstable_tablet_streaming_test.cc
|
||||
sstable_tablet_streaming.cc
|
||||
statement_restrictions_test.cc
|
||||
storage_proxy_test.cc
|
||||
tablets_test.cc
|
||||
|
||||
@@ -1,36 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2026-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <seastar/core/with_timeout.hh>
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
// Test that two lock_tables_metadata calls don't deadlock
|
||||
SEASTAR_TEST_CASE(test_lock_tables_metadata_deadlock) {
|
||||
return do_with_cql_env_thread([](cql_test_env& e) {
|
||||
try {
|
||||
// Repeat the test scenario to increase the chance of hitting the deadlock.
|
||||
// If no deadlock occurs, each repetition should complete within a fraction of a second,
|
||||
// so even with 100 repetitions, the total test time should be reasonable.
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
with_timeout(lowres_clock::now() + 30s,
|
||||
when_all_succeed(
|
||||
e.local_db().lock_tables_metadata(e.db()).discard_result(),
|
||||
e.local_db().lock_tables_metadata(e.db()).discard_result()
|
||||
)).get();
|
||||
}
|
||||
} catch (seastar::timed_out_error&) {
|
||||
fmt::print(stderr, "FAIL: lock_tables_metadata deadlocked (timed out after 30s)\n");
|
||||
_exit(1);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -246,33 +246,6 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_extra_temporary_toc) {
|
||||
});
|
||||
}
|
||||
|
||||
// Reproducer for SCYLLADB-1697
|
||||
SEASTAR_TEST_CASE(sstable_directory_test_unlink_sstable_leaves_no_orphans) {
|
||||
return sstables::test_env::do_with_async([] (test_env& env) {
|
||||
for (const auto version : {sstable_version_types::me, sstable_version_types::ms}) {
|
||||
testlog.info("Testing sstable version: {}", version);
|
||||
auto sst = make_sstable_for_this_shard([&env, version] {
|
||||
return env.make_sstable(test_table_schema(), version);
|
||||
});
|
||||
|
||||
// Sanity: the TOC was written, otherwise the assertion below would be vacuous.
|
||||
BOOST_REQUIRE(file_exists(test(sst).filename(sstables::component_type::TOC).native()).get());
|
||||
|
||||
sst->unlink().get();
|
||||
|
||||
std::vector<sstring> remaining;
|
||||
lister::scan_dir(env.tempdir().path(), lister::dir_entry_types::of<directory_entry_type::regular>(),
|
||||
[&remaining] (fs::path, directory_entry de) {
|
||||
remaining.push_back(de.name);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
|
||||
BOOST_REQUIRE_MESSAGE(remaining.empty(),
|
||||
fmt::format("Expected empty sstable dir after unlink for version {}, found: {}", version, remaining));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Test the absence of TOC. Behavior is controllable by a flag
|
||||
SEASTAR_TEST_CASE(sstable_directory_test_table_missing_toc) {
|
||||
return sstables::test_env::do_with_async([] (test_env& env) {
|
||||
|
||||
@@ -11,11 +11,13 @@ from typing import TYPE_CHECKING
|
||||
|
||||
from cassandra.auth import PlainTextAuthProvider
|
||||
|
||||
from test.pylib.internal_types import ServerInfo
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.cluster.dtest.ccmlib.common import logger
|
||||
from test.cluster.dtest.ccmlib.scylla_node import ScyllaNode
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterable
|
||||
from typing import Any
|
||||
|
||||
|
||||
@@ -27,10 +29,6 @@ class ScyllaCluster:
|
||||
self.manager = manager
|
||||
self.scylla_mode = scylla_mode
|
||||
self._config_options = {}
|
||||
# Cached ScyllaNode instances. Nodes are appended by _add_nodes()
|
||||
# in the order they are created by servers_add().
|
||||
self._nodes: list[ScyllaNode] = []
|
||||
self._next_node_num: int = 1
|
||||
|
||||
if self.scylla_mode == "debug":
|
||||
self.default_wait_other_notice_timeout = 600
|
||||
@@ -41,20 +39,19 @@ class ScyllaCluster:
|
||||
|
||||
self.force_wait_for_cluster_start = force_wait_for_cluster_start
|
||||
|
||||
def _add_nodes(self, servers: list) -> None:
|
||||
"""Create ScyllaNode instances for the given servers and cache them."""
|
||||
for server in servers:
|
||||
name = f"node{self._next_node_num}"
|
||||
self._next_node_num += 1
|
||||
self._nodes.append(ScyllaNode(
|
||||
cluster=self, server=server, name=name))
|
||||
@staticmethod
|
||||
def _sorted_nodes(servers: Iterable[ServerInfo]) -> list[ServerInfo]:
|
||||
return sorted(servers, key=lambda s: s.server_id)
|
||||
|
||||
@property
|
||||
def nodes(self) -> dict[str, ScyllaNode]:
|
||||
return {node.name: node for node in self.nodelist()}
|
||||
|
||||
def nodelist(self) -> list[ScyllaNode]:
|
||||
return list(self._nodes)
|
||||
return [
|
||||
ScyllaNode(cluster=self, server=server, name=f"node{n}")
|
||||
for n, server in enumerate(self._sorted_nodes(self.manager.all_servers()), start=1)
|
||||
]
|
||||
|
||||
def get_node_ip(self, nodeid: int) -> str:
|
||||
return self.nodelist()[nodeid-1].address()
|
||||
@@ -64,16 +61,16 @@ class ScyllaCluster:
|
||||
self.manager.auth_provider = PlainTextAuthProvider(username="cassandra", password="cassandra")
|
||||
match nodes:
|
||||
case int():
|
||||
self._add_nodes(self.manager.servers_add(servers_num=nodes, config=self._config_options, start=False, auto_rack_dc="dc1"))
|
||||
self.manager.servers_add(servers_num=nodes, config=self._config_options, start=False, auto_rack_dc="dc1")
|
||||
case list():
|
||||
for dc, n_nodes in enumerate(nodes, start=1):
|
||||
dc_name = f"dc{dc}"
|
||||
self._add_nodes(self.manager.servers_add(
|
||||
self.manager.servers_add(
|
||||
servers_num=n_nodes,
|
||||
config=self._config_options,
|
||||
start=False,
|
||||
auto_rack_dc=dc_name
|
||||
))
|
||||
)
|
||||
case dict():
|
||||
# Supported spec: {"dc1": {"rack1": 3, "rack2": 2}, "dc2": {"rack1": 2}}
|
||||
for dc, dc_nodes in nodes.items():
|
||||
@@ -82,7 +79,7 @@ class ScyllaCluster:
|
||||
for rack, rack_nodes in dc_nodes.items():
|
||||
if not isinstance(rack_nodes, int):
|
||||
raise RuntimeError(f"Unsupported topology specification: {nodes}")
|
||||
self._add_nodes(self.manager.servers_add(
|
||||
self.manager.servers_add(
|
||||
servers_num=rack_nodes,
|
||||
config=self._config_options,
|
||||
property_file={
|
||||
@@ -90,7 +87,7 @@ class ScyllaCluster:
|
||||
"rack": rack,
|
||||
},
|
||||
start=False,
|
||||
))
|
||||
)
|
||||
case _:
|
||||
raise RuntimeError(f"Unsupported topology specification: {nodes}")
|
||||
|
||||
@@ -230,6 +227,11 @@ class ScyllaCluster:
|
||||
def flush(self) -> None:
|
||||
self.nodetool("flush")
|
||||
|
||||
def compact(self, keyspace: str = "", tables: list[str] | None = None) -> None:
|
||||
for node in self.nodelist():
|
||||
if node.is_running():
|
||||
node.compact(keyspace=keyspace, tables=tables)
|
||||
|
||||
@staticmethod
|
||||
def debug(message: str) -> None:
|
||||
logger.debug(message)
|
||||
|
||||
@@ -17,7 +17,6 @@ from itertools import chain
|
||||
from functools import cached_property
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any
|
||||
import logging
|
||||
|
||||
from test.cluster.dtest.ccmlib.common import ArgumentError, wait_for, BIN_DIR
|
||||
from test.pylib.internal_types import ServerUpState
|
||||
@@ -29,9 +28,6 @@ if TYPE_CHECKING:
|
||||
from test.cluster.dtest.ccmlib.scylla_cluster import ScyllaCluster
|
||||
|
||||
|
||||
logger = logging.getLogger("scylla_node")
|
||||
|
||||
|
||||
NODETOOL_STDERR_IGNORED_PATTERNS = (
|
||||
re.compile(r"WARNING: debug mode. Not for benchmarking or production"),
|
||||
re.compile(
|
||||
@@ -115,6 +111,7 @@ class ScyllaNode:
|
||||
self.data_center = server.datacenter
|
||||
self.rack = server.rack
|
||||
|
||||
self._hostid = None
|
||||
self._smp_set_during_test = None
|
||||
self._smp = None
|
||||
self._memory = None
|
||||
@@ -153,20 +150,15 @@ class ScyllaNode:
|
||||
return self.cluster.scylla_mode
|
||||
|
||||
def set_smp(self, smp: int) -> None:
|
||||
logger.debug(f"Setting smp: {self=} {smp=}")
|
||||
self._smp_set_during_test = smp
|
||||
|
||||
def smp(self) -> int:
|
||||
logger.debug(f"Getting smp: {self=} _smp_set_during_test={self._smp_set_during_test} _smp={self._smp} {DEFAULT_SMP=}")
|
||||
return self._smp_set_during_test or self._smp or DEFAULT_SMP
|
||||
|
||||
def memory(self) -> int:
|
||||
return self._memory or self.smp() * DEFAULT_MEMORY_PER_CPU
|
||||
|
||||
def _adjust_smp_and_memory(self, smp: int | None = None, memory: int | None = None) -> None:
|
||||
if not memory and not smp:
|
||||
return
|
||||
logger.debug(f"Adjusting smp={smp} memory={memory} current_smp={self._smp} current_memory={self._memory}")
|
||||
if memory:
|
||||
self._memory = memory // (smp or self.smp()) * self.smp()
|
||||
if smp:
|
||||
@@ -455,8 +447,6 @@ class ScyllaNode:
|
||||
|
||||
self.mark = self.mark_log()
|
||||
|
||||
logger.debug(f"Starting server: server_id={self.server_id} {scylla_args=} {scylla_env=}")
|
||||
|
||||
self.cluster.manager.server_start(
|
||||
server_id=self.server_id,
|
||||
seeds=None if self.bootstrap else [self.address()],
|
||||
@@ -476,6 +466,9 @@ class ScyllaNode:
|
||||
if wait_for_binary_proto:
|
||||
self.wait_for_binary_interface(from_mark=self.mark)
|
||||
|
||||
if not self._hostid:
|
||||
self.hostid()
|
||||
|
||||
if wait_other_notice:
|
||||
timeout = self.cluster.default_wait_other_notice_timeout
|
||||
for node, mark in marks:
|
||||
@@ -658,11 +651,12 @@ class ScyllaNode:
|
||||
cmd.append(table)
|
||||
self.nodetool(" ".join(cmd), **kwargs)
|
||||
|
||||
def compact(self, keyspace: str = "", tables: str | None = ()) -> None:
|
||||
def compact(self, keyspace: str = "", tables: list[str] | None = None) -> None:
|
||||
compact_cmd = ["compact"]
|
||||
if keyspace:
|
||||
compact_cmd.append(keyspace)
|
||||
compact_cmd += tables
|
||||
if tables:
|
||||
compact_cmd.extend(tables)
|
||||
self.nodetool(" ".join(compact_cmd))
|
||||
|
||||
def drain(self, block_on_log: bool = False) -> None:
|
||||
@@ -835,10 +829,13 @@ class ScyllaNode:
|
||||
assert timeout is None, "argument `timeout` is not supported" # not used in scylla-dtest
|
||||
assert force_refresh is None, "argument `force_refresh` is not supported" # not used in scylla-dtest
|
||||
|
||||
try:
|
||||
return self.cluster.manager.get_host_id(server_id=self.server_id)
|
||||
except Exception as exc:
|
||||
self.error(f"Failed to get hostid: {exc}")
|
||||
if not self._hostid:
|
||||
try:
|
||||
self._hostid = self.cluster.manager.get_host_id(server_id=self.server_id)
|
||||
except Exception as exc:
|
||||
self.error(f"Failed to get hostid: {exc}")
|
||||
|
||||
return self._hostid
|
||||
|
||||
def rmtree(self, path: str | Path) -> None:
|
||||
"""Delete a directory content without removing the directory.
|
||||
|
||||
@@ -34,6 +34,7 @@ def pytest_addoption(parser: Parser) -> None:
|
||||
parser.addoption("--experimental-features", type=lambda s: s.split(","), action="store", help="Pass experimental features <feature>,<feature> to enable", default=None)
|
||||
parser.addoption("--tablets", action=argparse.BooleanOptionalAction, default=False, help="Whether to enable tablets support (default: %(default)s)")
|
||||
parser.addoption("--force-gossip-topology-changes", action="store_true", default=False, help="force gossip topology changes in a fresh cluster")
|
||||
parser.addoption("--compaction-strategy", action="store", default=None, help="Compaction strategy to use in tests that support it (e.g. wide_rows_test.py). One of LeveledCompactionStrategy, SizeTieredCompactionStrategy, TimeWindowCompactionStrategy, or IncrementalCompactionStrategy. If not set, a random strategy is chosen per test.")
|
||||
|
||||
|
||||
def pytest_configure(config: Config) -> None:
|
||||
|
||||
@@ -1,46 +0,0 @@
|
||||
#
|
||||
# Copyright (C) 2026-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
#
|
||||
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
|
||||
from dtest_class import Tester
|
||||
|
||||
logger = logging.getLogger(__file__)
|
||||
|
||||
|
||||
@pytest.mark.single_node
|
||||
class TestSetSmp(Tester):
|
||||
"""Test that node.set_smp() properly persists across restarts."""
|
||||
|
||||
def _get_smp_from_log(self, node, from_mark=None):
|
||||
"""Extract smp value from the node's log by looking at the SHARD_COUNT gossip value."""
|
||||
matches = node.grep_log(r"SHARD_COUNT : Value\((\d+),\d+\)", from_mark=from_mark)
|
||||
assert matches, "Could not find SHARD_COUNT in node log"
|
||||
# Return the last match (most recent start)
|
||||
return int(matches[-1][1].group(1))
|
||||
|
||||
def test_set_smp(self):
|
||||
"""Verify that set_smp() takes effect on the next start."""
|
||||
cluster = self.cluster
|
||||
cluster.populate(1).start(wait_for_binary_proto=True)
|
||||
node1 = cluster.nodelist()[0]
|
||||
|
||||
default_smp = self._get_smp_from_log(node1)
|
||||
|
||||
cluster.stop()
|
||||
|
||||
# set_smp to a different value and restart without jvm_args
|
||||
target_smp = 1 if default_smp != 1 else 2
|
||||
node1.set_smp(target_smp)
|
||||
mark = node1.mark_log()
|
||||
cluster.start(wait_for_binary_proto=True)
|
||||
|
||||
node1 = cluster.nodelist()[0]
|
||||
actual_smp = self._get_smp_from_log(node1, from_mark=mark)
|
||||
assert actual_smp == target_smp, \
|
||||
f"Expected smp={target_smp} after set_smp({target_smp}), got {actual_smp}"
|
||||
@@ -263,3 +263,25 @@ def assert_lists_equal_ignoring_order(list1, list2, sort_key=None):
|
||||
sorted_list2 = sorted(normalized_list2, key=lambda elm: str(elm[sort_key]))
|
||||
|
||||
assert sorted_list1 == sorted_list2
|
||||
|
||||
|
||||
def assert_equal_more_with_deviation(actual, expect, deviation_perc):
|
||||
"""
|
||||
Assert actual is within inclusive interval [expected...expected+deviation_perc]
|
||||
@param actual Value inspected
|
||||
@param expect Beginning of expected interval
|
||||
@param deviation_perc allowed percent increase
|
||||
"""
|
||||
deviation_high = (expect * (100 + deviation_perc)) / 100
|
||||
assert expect <= actual <= deviation_high, f"Expect result interval {expect}..{deviation_high}, received {actual}"
|
||||
|
||||
|
||||
def assert_less_equal_lists(actual_list, expected_list, msg=None):
|
||||
"""
|
||||
Assert actual_list is a subset of the expected list, prints hardcoded or parameterized error message
|
||||
@param actual_list Inspected list
|
||||
@param expected_list List that supposed to include actual_list
|
||||
@param msg Configured message default None.
|
||||
"""
|
||||
standardMsg = msg or f"{actual_list} not less than or equal to {expected_list}"
|
||||
assert set(actual_list) <= set(expected_list), standardMsg
|
||||
|
||||
1455
test/cluster/dtest/wide_rows_test.py
Normal file
1455
test/cluster/dtest/wide_rows_test.py
Normal file
File diff suppressed because it is too large
Load Diff
@@ -29,16 +29,12 @@ import pytest
|
||||
from cassandra import AlreadyExists, AuthenticationFailed, ConsistencyLevel, InvalidRequest, Unauthorized, Unavailable, WriteFailure
|
||||
from cassandra.auth import PlainTextAuthProvider
|
||||
from cassandra.cluster import NoHostAvailable, Session, EXEC_PROFILE_DEFAULT
|
||||
from cassandra.connection import UnixSocketEndPoint
|
||||
from cassandra.policies import WhiteListRoundRobinPolicy
|
||||
from cassandra.query import BatchStatement, BatchType, SimpleStatement, named_tuple_factory
|
||||
|
||||
from test.cluster.conftest import cluster_con
|
||||
from test.cluster.dtest.dtest_class import create_ks, wait_for
|
||||
from test.cluster.dtest.tools.assertions import assert_invalid
|
||||
from test.cluster.dtest.tools.data import rows_to_list, run_in_parallel
|
||||
|
||||
from test.pylib.driver_utils import safe_driver_shutdown
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import read_barrier
|
||||
from test.pylib.skip_types import skip_env
|
||||
@@ -277,7 +273,6 @@ class AuditEntry:
|
||||
statement: str
|
||||
table: str
|
||||
user: str
|
||||
source: str = "127.0.0.1"
|
||||
|
||||
|
||||
class AuditBackend:
|
||||
@@ -454,13 +449,6 @@ class AuditBackendSyslog(AuditBackend):
|
||||
entries.append(self.line_to_row(line, idx))
|
||||
return { self.audit_mode(): entries }
|
||||
|
||||
@staticmethod
|
||||
def _parse_address(addr_port):
|
||||
"""Extract IP from 'ip:port' (IPv4) or '[ip]:port' (IPv6)."""
|
||||
if addr_port.startswith("["):
|
||||
return addr_port[1:addr_port.index("]")]
|
||||
return addr_port.split(":")[0]
|
||||
|
||||
def line_to_row(self, line, idx):
|
||||
metadata, data = line.split(": ", 1)
|
||||
data = "".join(data.splitlines()) # Remove newlines
|
||||
@@ -472,9 +460,9 @@ class AuditBackendSyslog(AuditBackend):
|
||||
# and make sure it doesn't change during the test (e.g. when the test is running at 23:59:59)
|
||||
date = datetime.datetime(2000, 1, 1, 0, 0)
|
||||
|
||||
node = self._parse_address(match.group("node"))
|
||||
node = match.group("node").split(":")[0]
|
||||
statement = match.group("query").replace("\\", "")
|
||||
source = self._parse_address(match.group("client_ip"))
|
||||
source = match.group("client_ip").split(":")[0]
|
||||
event_time = uuid.UUID(int=idx)
|
||||
t = self.named_tuple_factory(date, node, event_time, match.group("category"), match.group("cl"), match.group("error") == "true", match.group("keyspace"), statement, source, match.group("table"), match.group("username"))
|
||||
return t
|
||||
@@ -594,7 +582,6 @@ class CQLAuditTester(AuditTester):
|
||||
user="anonymous",
|
||||
cl="ONE",
|
||||
error=False,
|
||||
source="127.0.0.1",
|
||||
):
|
||||
self.assert_audit_row_fields(row)
|
||||
assert row.node in self.server_addresses
|
||||
@@ -603,7 +590,7 @@ class CQLAuditTester(AuditTester):
|
||||
assert row.error == error
|
||||
assert row.keyspace_name == ks
|
||||
assert row.operation == statement
|
||||
assert row.source == source
|
||||
assert row.source == "127.0.0.1"
|
||||
assert row.table_name == table
|
||||
assert row.username == user
|
||||
|
||||
@@ -827,7 +814,7 @@ class CQLAuditTester(AuditTester):
|
||||
sorted_new_rows = sorted(new_rows, key=lambda row: (row.node, row.category, row.consistency, row.error, row.keyspace_name, row.operation, row.source, row.table_name, row.username))
|
||||
assert len(sorted_new_rows) == len(expected_entries)
|
||||
for row, entry in zip(sorted_new_rows, sorted(expected_entries)):
|
||||
self.assert_audit_row_eq(row, entry.category, entry.statement, entry.table, entry.ks, entry.user, entry.cl, entry.error, entry.source)
|
||||
self.assert_audit_row_eq(row, entry.category, entry.statement, entry.table, entry.ks, entry.user, entry.cl, entry.error)
|
||||
|
||||
async def verify_keyspace(self, audit_settings=None, helper=None):
|
||||
"""
|
||||
@@ -1867,44 +1854,6 @@ class CQLAuditTester(AuditTester):
|
||||
finally:
|
||||
session.execute("DROP KEYSPACE IF EXISTS kss")
|
||||
|
||||
# Unix domain sockets have no IP peer address. Seastar's
|
||||
# socket_address::addr() falls through to the default case for
|
||||
# AF_UNIX and returns a zero-initialised in6_addr, i.e. "::".
|
||||
MAINTENANCE_SOCKET_SOURCE = "::"
|
||||
|
||||
async def _test_audit_maintenance_socket_user_creation(self, manager, helper_class):
|
||||
with helper_class() as helper:
|
||||
session = await self.prepare(
|
||||
user="cassandra", password="cassandra",
|
||||
helper=helper,
|
||||
audit_settings={**helper.audit_default_settings, "audit_categories": "DCL", "audit_keyspaces": ""},
|
||||
create_keyspace=False,
|
||||
)
|
||||
|
||||
servers = await manager.running_servers()
|
||||
server = servers[0]
|
||||
socket_path = await manager.server_get_maintenance_socket_path(server.server_id)
|
||||
|
||||
logger.info("Connecting to maintenance socket")
|
||||
endpoint = UnixSocketEndPoint(socket_path)
|
||||
maint_cluster = cluster_con([endpoint],
|
||||
load_balancing_policy=WhiteListRoundRobinPolicy([endpoint]))
|
||||
maint_session = maint_cluster.connect()
|
||||
|
||||
role_name = "audit_test_admin"
|
||||
create_stmt = f"CREATE ROLE {role_name} WITH PASSWORD = 'secret' AND SUPERUSER = true AND LOGIN = true"
|
||||
expected_operation = f"CREATE ROLE {role_name} WITH PASSWORD = '***' AND SUPERUSER = true AND LOGIN = true"
|
||||
|
||||
logger.info("Creating superuser via maintenance socket and verifying audit entry")
|
||||
expected_entries = [AuditEntry(category="DCL", statement=expected_operation,
|
||||
user="anonymous", table="", ks="", cl="LOCAL_QUORUM", error=False,
|
||||
source=self.MAINTENANCE_SOCKET_SOURCE)]
|
||||
with self.assert_entries_were_added(session, expected_entries):
|
||||
maint_session.execute(create_stmt)
|
||||
|
||||
logger.info("Cleaning up created role")
|
||||
maint_session.execute(f"DROP ROLE IF EXISTS {role_name}")
|
||||
safe_driver_shutdown(maint_cluster)
|
||||
|
||||
# AuditBackendTable, no auth, rf=1
|
||||
|
||||
@@ -1997,14 +1946,6 @@ async def test_service_level_statements_standalone(manager: ManagerClient):
|
||||
await CQLAuditTester(manager)._test_service_level_statements()
|
||||
|
||||
|
||||
async def test_audit_maintenance_socket_user_creation(manager: ManagerClient):
|
||||
"""Verify that creating a superuser via the maintenance socket is audited."""
|
||||
t = CQLAuditTester(manager)
|
||||
await t._test_audit_maintenance_socket_user_creation(manager, AuditBackendTable)
|
||||
Syslog = functools.partial(AuditBackendSyslog, socket_path=syslog_socket_path)
|
||||
await t._test_audit_maintenance_socket_user_creation(manager, Syslog)
|
||||
|
||||
|
||||
# AuditBackendSyslog, no auth, rf=1
|
||||
|
||||
async def test_audit_syslog_noauth(manager: ManagerClient):
|
||||
|
||||
@@ -48,5 +48,6 @@ run_in_dev:
|
||||
- dtest/commitlog_test
|
||||
- dtest/cfid_test
|
||||
- dtest/rebuild_test
|
||||
- dtest/wide_rows_test
|
||||
run_in_debug:
|
||||
- random_failures/test_random_failures
|
||||
|
||||
@@ -8,7 +8,7 @@ from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair
|
||||
from test.pylib.tablets import get_all_tablet_replicas
|
||||
from test.cluster.tasks.task_manager_client import TaskManagerClient
|
||||
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, ensure_group0_leader_on, new_test_keyspace, new_test_table, trigger_stepdown
|
||||
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
|
||||
from cassandra.query import ConsistencyLevel, SimpleStatement
|
||||
@@ -880,30 +880,41 @@ async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manage
|
||||
# affected replica but process the UNREPAIRED sstable on the others, so the classification
|
||||
# divergence is never corrected. In tombstone scenarios this enables premature tombstone GC
|
||||
# on the affected replica leading to data resurrection.
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
|
||||
cmdline = ['--hinted-handoff-enabled', '0']
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
|
||||
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
|
||||
|
||||
class _LeadershipTransferred(Exception):
|
||||
"""Raised when leadership transferred to servers[1] during the test, requiring a retry."""
|
||||
pass
|
||||
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
|
||||
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
|
||||
await cql.run_async(
|
||||
f"ALTER TABLE {ks}.test WITH compaction = "
|
||||
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
|
||||
)
|
||||
|
||||
async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, token, scylla_path, current_key):
|
||||
"""Core logic for test_incremental_repair_race_window_promotes_unrepaired_data.
|
||||
# Disable autocompaction everywhere so we control exactly when compaction runs.
|
||||
for s in servers:
|
||||
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
|
||||
|
||||
scylla_path = await manager.server_get_exe(servers[0].server_id)
|
||||
|
||||
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
|
||||
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
|
||||
# S0'(repaired_at=1) on all nodes.
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
|
||||
# Insert keys 10-19 and flush on all nodes → S1(repaired_at=0).
|
||||
# These will be the subject of repair 2.
|
||||
repair2_keys = list(range(current_key, current_key + 10))
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
|
||||
for s in servers:
|
||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||
current_key += 10
|
||||
|
||||
Returns the next current_key value.
|
||||
Raises _LeadershipTransferred if servers[1] becomes coordinator after the
|
||||
restart, signalling the caller to retry.
|
||||
"""
|
||||
# Ensure servers[1] is not the topology coordinator. If the coordinator is
|
||||
# restarted, the Raft leader dies, a new election occurs, and the new
|
||||
# coordinator re-initiates tablet repair -- flushing memtables on all replicas
|
||||
# and marking post-repair data as repaired. That legitimate re-repair masks
|
||||
# the compaction-merge bug this test detects.
|
||||
coord = await get_topology_coordinator(manager)
|
||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||
if coord_serv == servers[1]:
|
||||
other = next(s for s in servers if s != servers[1])
|
||||
await ensure_group0_leader_on(manager, other)
|
||||
coord = await get_topology_coordinator(manager)
|
||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||
coord_log = await manager.server_open_log(coord_serv.server_id)
|
||||
coord_mark = await coord_log.mark()
|
||||
|
||||
@@ -967,16 +978,6 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
|
||||
await manager.server_start(target.server_id)
|
||||
await manager.servers_see_each_other(servers)
|
||||
|
||||
# Check if leadership transferred to servers[1] during the restart.
|
||||
# If so, the new coordinator will re-initiate repair, masking the bug.
|
||||
new_coord = await get_topology_coordinator(manager)
|
||||
new_coord_serv = await find_server_by_host_id(manager, servers, new_coord)
|
||||
if new_coord_serv == servers[1]:
|
||||
await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update")
|
||||
await manager.api.wait_task(servers[0].ip_addr, task_id)
|
||||
raise _LeadershipTransferred(
|
||||
"servers[1] became topology coordinator after restart")
|
||||
|
||||
# Poll until compaction has produced F(repaired_at=2) containing post-repair keys,
|
||||
# confirming that the bug was triggered (S1' and E merged during the race window).
|
||||
deadline = time.time() + 60
|
||||
@@ -999,7 +1000,7 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
|
||||
if not compaction_ran:
|
||||
logger.warning("Compaction did not merge S1' and E after restart during the race window; "
|
||||
"the bug was not triggered. Skipping assertion.")
|
||||
return current_key
|
||||
return
|
||||
|
||||
# Flush servers[0] and servers[2] AFTER the race window closes so their post-repair
|
||||
# keys land in G(repaired_at=0): correctly classified as UNREPAIRED.
|
||||
@@ -1030,9 +1031,8 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
|
||||
f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, "
|
||||
f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}")
|
||||
|
||||
# servers[0] and servers[2] were never restarted and the coordinator stayed
|
||||
# alive throughout, so no re-repair could have flushed their memtables.
|
||||
# Post-repair keys must NOT appear in repaired sstables on these servers.
|
||||
# servers[0] and servers[2] flushed post-repair keys after the race window closed,
|
||||
# so those keys are in G(repaired_at=0) → correctly UNREPAIRED.
|
||||
assert not (repaired_keys_0 & post_repair_key_set), \
|
||||
f"servers[0] should not have post-repair keys in repaired sstables, " \
|
||||
f"got: {repaired_keys_0 & post_repair_key_set}"
|
||||
@@ -1053,54 +1053,6 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
|
||||
f"on servers[1] after restart lost the being_repaired markers during the race window. " \
|
||||
f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \
|
||||
f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"
|
||||
return current_key
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
|
||||
cmdline = ['--hinted-handoff-enabled', '0']
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
|
||||
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
|
||||
|
||||
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
|
||||
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
|
||||
await cql.run_async(
|
||||
f"ALTER TABLE {ks}.test WITH compaction = "
|
||||
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
|
||||
)
|
||||
|
||||
# Disable autocompaction everywhere so we control exactly when compaction runs.
|
||||
for s in servers:
|
||||
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
|
||||
|
||||
scylla_path = await manager.server_get_exe(servers[0].server_id)
|
||||
|
||||
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
|
||||
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
|
||||
# S0'(repaired_at=1) on all nodes.
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
|
||||
# Insert keys 10-19 and flush on all nodes -> S1(repaired_at=0).
|
||||
# These will be the subject of repair 2.
|
||||
repair2_keys = list(range(current_key, current_key + 10))
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
|
||||
for s in servers:
|
||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||
current_key += 10
|
||||
|
||||
# If leadership transfers to servers[1] between our coordinator check and the
|
||||
# restart, the coordinator change masks the bug. Detect and retry.
|
||||
max_attempts = 5
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
try:
|
||||
current_key = await _do_race_window_promotes_unrepaired_data(
|
||||
manager, servers, cql, ks, token, scylla_path, current_key)
|
||||
return
|
||||
except _LeadershipTransferred as e:
|
||||
logger.warning(f"Attempt {attempt}/{max_attempts}: {e}. Retrying.")
|
||||
|
||||
pytest.fail(f"Leadership kept transferring to servers[1] after {max_attempts} attempts; "
|
||||
"could not run the test without coordinator interference.")
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
# Tombstone GC safety tests
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
#
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.cluster.util import get_topology_coordinator, trigger_stepdown, new_test_keyspace, new_test_table
|
||||
from test.cluster.util import get_topology_coordinator, trigger_stepdown
|
||||
|
||||
import pytest
|
||||
import logging
|
||||
@@ -83,78 +83,3 @@ async def test_load_stats_on_coordinator_failover(manager: ManagerClient):
|
||||
coord3 = await get_topology_coordinator(manager)
|
||||
if coord3:
|
||||
break
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_load_stats_refresh_during_shutdown(manager: ManagerClient):
|
||||
"""Verify that _tablet_load_stats_refresh is properly joined during
|
||||
topology coordinator shutdown, even when a schema change notification
|
||||
triggers a refresh between run() completing and stop() being called.
|
||||
|
||||
Reproduces the scenario using two injection points:
|
||||
- topology_coordinator_pause_before_stop: pauses after run() finishes
|
||||
but before stop() is called
|
||||
- refresh_tablet_load_stats_pause: holds refresh_tablet_load_stats()
|
||||
so it's still in-flight during shutdown
|
||||
|
||||
Without the join in stop(), the refresh task outlives the coordinator
|
||||
and accesses freed memory.
|
||||
"""
|
||||
servers = await manager.servers_add(3)
|
||||
await manager.get_ready_cql(servers)
|
||||
|
||||
async with new_test_keyspace(manager,
|
||||
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
||||
coord = await get_topology_coordinator(manager)
|
||||
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
||||
coord_idx = host_ids.index(coord)
|
||||
coord_server = servers[coord_idx]
|
||||
|
||||
log = await manager.server_open_log(coord_server.server_id)
|
||||
mark = await log.mark()
|
||||
|
||||
# Injection B: pause between run() returning and stop() being called.
|
||||
await manager.api.enable_injection(
|
||||
coord_server.ip_addr, "topology_coordinator_pause_before_stop", one_shot=True)
|
||||
|
||||
# Stepdown causes the topology coordinator to abort and shut down.
|
||||
logger.info("Triggering stepdown on coordinator")
|
||||
await trigger_stepdown(manager, coord_server)
|
||||
|
||||
# Wait for injection B to fire. The coordinator has finished run() but
|
||||
# the schema change listener is still registered.
|
||||
mark, _ = await log.wait_for(
|
||||
"topology_coordinator_pause_before_stop: waiting", from_mark=mark)
|
||||
|
||||
# Injection A: block refresh_tablet_load_stats() before it accesses _shared_tm.
|
||||
# Enable it now so it only catches the notification-triggered call.
|
||||
await manager.api.enable_injection(
|
||||
coord_server.ip_addr, "refresh_tablet_load_stats_pause", one_shot=True)
|
||||
|
||||
# CREATE TABLE fires on_create_column_family on the old coordinator which
|
||||
# fire-and-forgets _tablet_load_stats_refresh.trigger() scheduling a task
|
||||
# via with_scheduling_group on the gossip scheduling group.
|
||||
logger.info("Issuing CREATE TABLE while coordinator is paused before stop()")
|
||||
async with new_test_table(manager, ks, "pk int PRIMARY KEY", reuse_tables=False):
|
||||
# Wait for injection A: refresh_tablet_load_stats() is now blocked before
|
||||
# accessing _shared_tm. The topology_coordinator is still alive (paused at B).
|
||||
await log.wait_for("refresh_tablet_load_stats_pause: waiting", from_mark=mark)
|
||||
|
||||
# Release injection B: coordinator proceeds through stop().
|
||||
# Without the fix, stop() returns quickly and run_topology_coordinator
|
||||
# frees the topology_coordinator frame. With the fix, stop() blocks at
|
||||
# _tablet_load_stats_refresh.join() until injection A is released.
|
||||
logger.info("Releasing injection B: coordinator will stop")
|
||||
await manager.api.message_injection(
|
||||
coord_server.ip_addr, "topology_coordinator_pause_before_stop")
|
||||
|
||||
# Release injection A: refresh_tablet_load_stats() resumes and accesses
|
||||
# this->_shared_tm via get_token_metadata_ptr(). Without the fix, 'this'
|
||||
# points to freed memory and ASan detects heap-use-after-free.
|
||||
logger.info("Releasing injection A: refresh resumes")
|
||||
await manager.api.message_injection(
|
||||
coord_server.ip_addr, "refresh_tablet_load_stats_pause")
|
||||
|
||||
# If the bug is present, the node crashed. read_barrier will fail.
|
||||
await read_barrier(manager.api, coord_server.ip_addr)
|
||||
|
||||
@@ -435,9 +435,8 @@ async def test_alter_tablets_rf_dc_drop(request: pytest.FixtureRequest, manager:
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
||||
async def get_replication_options(ks: str, host, ip_addr):
|
||||
await read_barrier(manager.api, ip_addr)
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
|
||||
async def get_replication_options(ks: str):
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
||||
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
||||
return repl
|
||||
|
||||
@@ -452,44 +451,43 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
|
||||
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
||||
|
||||
cql = manager.get_cql()
|
||||
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
|
||||
|
||||
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
||||
await cql.run_async("create table ks1.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks1")
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
await cql.run_async("create keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 2} and tablets = {'initial': 4};")
|
||||
await cql.run_async("create table ks2.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks2")
|
||||
assert repl['dc1'] == '1'
|
||||
assert repl['dc2'] == '2'
|
||||
|
||||
await cql.run_async("create keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
|
||||
await cql.run_async("create table ks3.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks3", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks3")
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
await cql.run_async("create keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
|
||||
await cql.run_async("create table ks4.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks4", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks4")
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
await cql.run_async(f"create keyspace ks5 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}} and tablets = {{'initial': 4}};")
|
||||
await cql.run_async("create table ks5.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks5", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks5")
|
||||
assert repl['dc1'] == '2'
|
||||
assert repl['dc2'] == '2'
|
||||
|
||||
await cql.run_async(f"create keyspace ks6 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
|
||||
await cql.run_async("create table ks6.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks6", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks6")
|
||||
assert repl['dc1'] == '2'
|
||||
|
||||
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
|
||||
|
||||
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks1")
|
||||
assert repl['dc1'] == ['rack1b']
|
||||
|
||||
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks1", "t")
|
||||
@@ -499,7 +497,7 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
|
||||
assert r.replicas[0][0] == host_ids[1]
|
||||
|
||||
await cql.run_async("alter keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a', 'rack2b']};")
|
||||
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks2")
|
||||
assert repl['dc1'] == ['rack1a']
|
||||
assert len(repl['dc2']) == 2
|
||||
assert 'rack2a' in repl['dc2'] and 'rack2b' in repl['dc2']
|
||||
@@ -525,13 +523,13 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
|
||||
pass
|
||||
|
||||
await cql.run_async("alter keyspace ks5 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b'], 'dc2' : 2};")
|
||||
repl = await get_replication_options("ks5", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks5")
|
||||
assert len(repl['dc1']) == 2
|
||||
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
|
||||
assert repl['dc2'] == '2'
|
||||
|
||||
await cql.run_async("alter keyspace ks6 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : 2, 'dc2' : ['rack2a']};")
|
||||
repl = await get_replication_options("ks6", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks6")
|
||||
assert repl['dc1'] == '2'
|
||||
assert len(repl['dc2']) == 1
|
||||
assert repl['dc2'][0] == 'rack2a'
|
||||
@@ -539,9 +537,8 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
||||
async def get_replication_options(ks: str, host, ip_addr):
|
||||
await read_barrier(manager.api, ip_addr)
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
|
||||
async def get_replication_options(ks: str):
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
||||
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
||||
return repl
|
||||
|
||||
@@ -554,11 +551,10 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
|
||||
await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc2', 'rack': 'rack2b'})]
|
||||
|
||||
cql = manager.get_cql()
|
||||
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
|
||||
|
||||
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
||||
await cql.run_async("create table ks1.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks1")
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
await cql.run_async("CREATE KEYSPACE ksv WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2} AND tablets = {'enabled': false}")
|
||||
@@ -578,19 +574,19 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
|
||||
servers = servers[0:-1]
|
||||
|
||||
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks1")
|
||||
assert repl['dc1'] == ['rack1b']
|
||||
|
||||
logging.info("Rolling restart")
|
||||
await manager.rolling_restart(servers, wait_for_cql=True, cmdline_options_override=["--enforce-rack-list", "true", "--error-injections-at-startup", "[]", "--smp", "2"])
|
||||
|
||||
await cql.run_async(f"create keyspace ks2 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
|
||||
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks2")
|
||||
assert len(repl['dc1']) == 2
|
||||
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
|
||||
|
||||
await cql.run_async(f"create keyspace ks3 with replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': 4}};")
|
||||
repl = await get_replication_options("ks3", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks3")
|
||||
assert len(repl['dc1']) == 1
|
||||
assert len(repl['dc2']) == 1
|
||||
assert 'rack1a' in repl['dc1'] or 'rack1b' in repl['dc1']
|
||||
@@ -606,7 +602,7 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
|
||||
assert failed
|
||||
|
||||
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b'], 'dc2': 1};")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks1")
|
||||
assert len(repl['dc1']) == 1
|
||||
assert repl['dc1'][0] == 'rack1b'
|
||||
assert len(repl['dc2']) == 1
|
||||
@@ -1113,9 +1109,8 @@ async def test_multi_rf_increase_before_decrease_0_N(request: pytest.FixtureRequ
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
||||
async def get_replication_options(ks: str, host, ip_addr):
|
||||
await read_barrier(manager.api, ip_addr)
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
|
||||
async def get_replication_options(ks: str):
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
||||
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
||||
return repl
|
||||
|
||||
@@ -1133,11 +1128,10 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
|
||||
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
||||
|
||||
cql = manager.get_cql()
|
||||
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
|
||||
|
||||
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
||||
await cql.run_async("create table ks1.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks1")
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
[await manager.api.disable_injection(s.ip_addr, numeric_injection) for s in servers]
|
||||
@@ -1171,7 +1165,7 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
|
||||
failed = True
|
||||
assert failed
|
||||
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks1")
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
@@ -18,7 +18,6 @@
|
||||
#include <seastar/testing/test_case.hh>
|
||||
|
||||
#include "test/lib/exception_utils.hh"
|
||||
#include "test/lib/log.hh"
|
||||
#include "test/lib/test_utils.hh"
|
||||
#include "ldap_common.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
@@ -682,41 +681,3 @@ SEASTAR_TEST_CASE(ldap_config) {
|
||||
},
|
||||
make_ldap_config());
|
||||
}
|
||||
|
||||
// Reproduces the race between the cache pruner and the permission
|
||||
// loader lifecycle during shutdown. Refs SCYLLADB-1679.
|
||||
SEASTAR_TEST_CASE(ldap_pruner_no_crash_after_loader_cleared) {
|
||||
auto cfg = make_ldap_config();
|
||||
cfg->permissions_update_interval_in_ms.set(1);
|
||||
|
||||
auto call_count = seastar::make_lw_shared<int>(0);
|
||||
|
||||
co_await do_with_cql_env_thread([call_count](cql_test_env& env) {
|
||||
auto& cache = env.auth_cache().local();
|
||||
|
||||
testlog.info("Populating 50 cache entries");
|
||||
for (int i = 0; i < 50; i++) {
|
||||
auto r = auth::make_data_resource("system", fmt::format("t{}", i));
|
||||
cache.get_permissions(auth::role_or_anonymous(), r).get();
|
||||
}
|
||||
|
||||
testlog.info("Installing slow permission loader (10ms per call)");
|
||||
cache.set_permission_loader(
|
||||
[call_count] (const auth::role_or_anonymous&, const auth::resource&)
|
||||
-> seastar::future<auth::permission_set> {
|
||||
++(*call_count);
|
||||
co_await seastar::sleep(std::chrono::milliseconds(10));
|
||||
co_return auth::permission_set();
|
||||
});
|
||||
|
||||
testlog.info("Waiting for pruner to start reloading");
|
||||
while (*call_count == 0) {
|
||||
seastar::sleep(std::chrono::milliseconds(1)).get();
|
||||
}
|
||||
|
||||
testlog.info("Pruner started, letting teardown run");
|
||||
}, cfg);
|
||||
|
||||
testlog.info("Loader called {} times", *call_count);
|
||||
}
|
||||
|
||||
|
||||
@@ -371,13 +371,9 @@ int scylla_simple_query_main(int argc, char** argv) {
|
||||
audit::audit::start_audit(env.local_db().get_config(), env.get_shared_token_metadata(), env.qp(), env.migration_manager()).handle_exception([&] (auto&& e) {
|
||||
fmt::print("audit start failed: {}", e);
|
||||
}).get();
|
||||
audit::audit::start_storage(env.local_db().get_config()).get();
|
||||
auto audit_stop = defer([] {
|
||||
audit::audit::stop_audit().get();
|
||||
});
|
||||
auto audit_storage_stop = defer([] {
|
||||
audit::audit::stop_storage().get();
|
||||
});
|
||||
auto results = do_cql_test(env, cfg);
|
||||
aggregated_perf_results agg(results);
|
||||
std::cout << agg << std::endl;
|
||||
|
||||
@@ -72,12 +72,16 @@ class CppFile(pytest.File, ABC):
|
||||
self.test_name = self.path.stem
|
||||
|
||||
# Implement following properties as cached_property because they are read-only, and based on stash items which
|
||||
# will be assigned in test/pylib/runner.py::pytest_collect_file() and modify_pytest_item() after instance creation.
|
||||
# will be assigned in test/pylib/runner.py::pytest_collect_file() hook after a CppFile instance was created.
|
||||
|
||||
@cached_property
|
||||
def build_mode(self) -> str:
|
||||
return self.stash[BUILD_MODE]
|
||||
|
||||
@cached_property
|
||||
def run_id(self) -> int:
|
||||
return self.stash[RUN_ID]
|
||||
|
||||
@cached_property
|
||||
def suite_config(self) -> dict[str, Any]:
|
||||
return self.stash[TEST_SUITE].cfg
|
||||
@@ -162,13 +166,9 @@ class CppTestCase(pytest.Item):
|
||||
self.own_markers = [getattr(pytest.mark, mark_name) for mark_name in own_markers]
|
||||
self.add_marker(pytest.mark.cpp)
|
||||
|
||||
@cached_property
|
||||
def run_id(self) -> int:
|
||||
return self.stash[RUN_ID]
|
||||
|
||||
def get_artifact_path(self, extra: str = "", suffix: str = "") -> pathlib.Path:
|
||||
return self.parent.log_dir / ".".join(
|
||||
(self.path.relative_to(TEST_DIR).with_suffix("") / f"{self.name}{extra}.{self.run_id}{suffix}").parts
|
||||
(self.path.relative_to(TEST_DIR).with_suffix("") / f"{self.name}{extra}.{self.parent.run_id}{suffix}").parts
|
||||
)
|
||||
|
||||
def make_testpy_test_object_mock(self) -> SimpleNamespace:
|
||||
@@ -179,7 +179,7 @@ class CppTestCase(pytest.Item):
|
||||
return SimpleNamespace(
|
||||
time_end=0,
|
||||
time_start=0,
|
||||
id=self.run_id,
|
||||
id=self.parent.run_id,
|
||||
mode=self.parent.build_mode,
|
||||
success=False,
|
||||
shortname=self.name,
|
||||
|
||||
@@ -15,7 +15,7 @@ import random
|
||||
import sys
|
||||
from argparse import BooleanOptionalAction
|
||||
from collections import defaultdict
|
||||
from itertools import chain, count
|
||||
from itertools import chain, count, product
|
||||
from functools import cache, cached_property
|
||||
from pathlib import Path
|
||||
from random import randint
|
||||
@@ -106,7 +106,7 @@ def pytest_addoption(parser: pytest.Parser) -> None:
|
||||
" '--logger-log-level raft=trace --default-log-level error'")
|
||||
parser.addoption('--x-log2-compaction-groups', action="store", default="0", type=int,
|
||||
help="Controls number of compaction groups to be used by Scylla tests. Value of 3 implies 8 groups.")
|
||||
parser.addoption('--repeat', action="store", default=1, type=int,
|
||||
parser.addoption('--repeat', action="store", default="1", type=int,
|
||||
help="number of times to repeat test execution")
|
||||
|
||||
# Pass information about Scylla node from test.py to pytest.
|
||||
@@ -162,10 +162,9 @@ def scylla_binary(testpy_test) -> Path:
|
||||
return testpy_test.suite.scylla_exe
|
||||
|
||||
|
||||
def pytest_collection_modifyitems(items: list[pytest.Item], config: pytest.Config) -> None:
|
||||
run_ids = defaultdict(lambda: count(start=int(config.getoption("--run_id") or 1)))
|
||||
def pytest_collection_modifyitems(items: list[pytest.Item]) -> None:
|
||||
for item in items:
|
||||
modify_pytest_item(item=item, run_ids=run_ids)
|
||||
modify_pytest_item(item=item)
|
||||
|
||||
suites_order = defaultdict(count().__next__) # number suites in order of appearance
|
||||
|
||||
@@ -286,10 +285,7 @@ def pytest_configure(config: pytest.Config) -> None:
|
||||
pytest_log_dir.mkdir(parents=True, exist_ok=True)
|
||||
if not _pytest_config.getoption("--save-log-on-success"):
|
||||
for file in pytest_log_dir.glob("*"):
|
||||
# This will help in case framework tests are executed with test.py event if it's the wrong way to run them.
|
||||
# test_no_bare_skip_markers_in_collection uses a subprocess to run a collection that has lead to race
|
||||
# condition, especially with repeat.
|
||||
file.unlink(missing_ok=True)
|
||||
file.unlink()
|
||||
|
||||
_pytest_config.stash[PYTEST_LOG_FILE] = f"{pytest_log_dir}/pytest_main_{HOST_ID}.log"
|
||||
|
||||
@@ -317,15 +313,14 @@ def pytest_configure(config: pytest.Config) -> None:
|
||||
|
||||
os.environ["TOPOLOGY_RANDOM_FAILURES_TEST_SHUFFLE_SEED"] = os.environ.get("TOPOLOGY_RANDOM_FAILURES_TEST_SHUFFLE_SEED", str(random.randint(0, sys.maxsize)))
|
||||
config.build_modes = get_modes_to_run(config)
|
||||
repeat = int(config.getoption("--repeat"))
|
||||
|
||||
if testpy_run_id := config.getoption("--run_id"):
|
||||
if config.getoption("--repeat") != 1:
|
||||
if repeat != 1:
|
||||
raise RuntimeError("Can't use --run_id and --repeat simultaneously.")
|
||||
|
||||
|
||||
class DisabledFile(pytest.File):
|
||||
def collect(self) -> list[pytest.Item]:
|
||||
pytest.skip("All tests in this file are disabled in requested modes according to the suite config.")
|
||||
config.run_ids = (testpy_run_id,)
|
||||
else:
|
||||
config.run_ids = tuple(range(1, repeat + 1))
|
||||
|
||||
|
||||
@pytest.hookimpl(wrapper=True)
|
||||
@@ -342,16 +337,19 @@ def pytest_collect_file(file_path: pathlib.Path,
|
||||
mode for mode in build_modes
|
||||
if not suite_config.is_test_disabled(build_mode=mode, path=file_path)
|
||||
)
|
||||
if repeats := [mode for mode in build_modes for _ in range(parent.config.getoption("--repeat"))]:
|
||||
ihook = parent.ihook
|
||||
collectors = list(chain(collectors, chain.from_iterable(
|
||||
ihook.pytest_collect_file(file_path=file_path, parent=parent) for _ in range(1, len(repeats))
|
||||
)))
|
||||
for build_mode, collector in zip(repeats, collectors, strict=True):
|
||||
collector.stash[BUILD_MODE] = build_mode
|
||||
collector.stash[TEST_SUITE] = suite_config
|
||||
else:
|
||||
collectors = [DisabledFile.from_parent(parent=parent, path=file_path)]
|
||||
repeats = list(product(build_modes, parent.config.run_ids))
|
||||
|
||||
if not repeats:
|
||||
return []
|
||||
|
||||
ihook = parent.ihook
|
||||
collectors = list(chain(collectors, chain.from_iterable(
|
||||
ihook.pytest_collect_file(file_path=file_path, parent=parent) for _ in range(1, len(repeats))
|
||||
)))
|
||||
for (build_mode, run_id), collector in zip(repeats, collectors, strict=True):
|
||||
collector.stash[BUILD_MODE] = build_mode
|
||||
collector.stash[RUN_ID] = run_id
|
||||
collector.stash[TEST_SUITE] = suite_config
|
||||
|
||||
parent.stash[REPEATING_FILES].remove(file_path)
|
||||
|
||||
@@ -425,7 +423,7 @@ class TestSuiteConfig:
|
||||
|
||||
TEST_SUITE = pytest.StashKey[TestSuiteConfig | None]()
|
||||
|
||||
_STASH_KEYS_TO_COPY = BUILD_MODE, TEST_SUITE
|
||||
_STASH_KEYS_TO_COPY = BUILD_MODE, RUN_ID, TEST_SUITE
|
||||
|
||||
|
||||
def get_params_stash(node: _pytest.nodes.Node) -> pytest.Stash | None:
|
||||
@@ -435,12 +433,11 @@ def get_params_stash(node: _pytest.nodes.Node) -> pytest.Stash | None:
|
||||
return parent.stash
|
||||
|
||||
|
||||
def modify_pytest_item(item: pytest.Item, run_ids: defaultdict[tuple[str, str], count]) -> None:
|
||||
def modify_pytest_item(item: pytest.Item) -> None:
|
||||
params_stash = get_params_stash(node=item)
|
||||
|
||||
for key in _STASH_KEYS_TO_COPY:
|
||||
item.stash[key] = params_stash[key]
|
||||
item.stash[RUN_ID] = next(run_ids[(item.stash[BUILD_MODE], item._nodeid)])
|
||||
|
||||
suffix = f".{item.stash[BUILD_MODE]}.{item.stash[RUN_ID]}"
|
||||
|
||||
|
||||
@@ -75,7 +75,6 @@ def test_no_bare_skip_markers_in_collection():
|
||||
"--collect-only",
|
||||
"--ignore=boost", "--ignore=raft",
|
||||
"--ignore=ldap", "--ignore=vector_search",
|
||||
"--ignore=unit",
|
||||
"-p", "no:sugar"],
|
||||
capture_output=True, text=True,
|
||||
cwd=str(_TEST_ROOT),
|
||||
|
||||
Reference in New Issue
Block a user