mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-29 12:47:02 +00:00
Compare commits
30 Commits
copilot/fi
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
45b4834ac4 | ||
|
|
7e14ea5ac8 | ||
|
|
809f12f988 | ||
|
|
3755c370ac | ||
|
|
543fb6a2db | ||
|
|
b7bc2d89e6 | ||
|
|
bc67dd0b82 | ||
|
|
1616c71bf0 | ||
|
|
c4de2b3c9d | ||
|
|
d9dd3bfe53 | ||
|
|
5eaa979f35 | ||
|
|
7430c1efd7 | ||
|
|
b0f988afc4 | ||
|
|
a7e9c0e6d2 | ||
|
|
3ea4af1c8c | ||
|
|
459e3970cd | ||
|
|
8756f7c068 | ||
|
|
2615d0e8d8 | ||
|
|
914b70c75b | ||
|
|
6b7ce5e244 | ||
|
|
9d3d424d58 | ||
|
|
f2f4915e09 | ||
|
|
92c09d106d | ||
|
|
8855e77465 | ||
|
|
adf1e26bab | ||
|
|
37a547604f | ||
|
|
c3e5285d45 | ||
|
|
f75e5ac65b | ||
|
|
6cb4c27f8c | ||
|
|
2503546251 |
@@ -194,22 +194,36 @@ future<> audit::start_audit(const db::config& cfg, sharded<locator::shared_token
|
|||||||
std::move(audited_keyspaces),
|
std::move(audited_keyspaces),
|
||||||
std::move(audited_tables),
|
std::move(audited_tables),
|
||||||
std::move(audited_categories),
|
std::move(audited_categories),
|
||||||
std::cref(cfg))
|
std::cref(cfg));
|
||||||
.then([&cfg] {
|
}
|
||||||
if (!audit_instance().local_is_initialized()) {
|
|
||||||
return make_ready_future<>();
|
future<> audit::start_storage(const db::config& cfg) {
|
||||||
}
|
if (!audit_instance().local_is_initialized()) {
|
||||||
return audit_instance().invoke_on_all([&cfg] (audit& local_audit) {
|
return make_ready_future<>();
|
||||||
return local_audit.start(cfg);
|
}
|
||||||
|
return audit_instance().invoke_on_all([&cfg] (audit& local_audit) {
|
||||||
|
return local_audit._storage_helper_ptr->start(cfg).then([&local_audit] {
|
||||||
|
local_audit._storage_running = true;
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
future<> audit::stop_storage() {
|
||||||
|
if (!audit_instance().local_is_initialized()) {
|
||||||
|
return make_ready_future<>();
|
||||||
|
}
|
||||||
|
return audit_instance().invoke_on_all([] (audit& local_audit) {
|
||||||
|
local_audit._storage_running = false;
|
||||||
|
return local_audit._storage_helper_ptr->stop();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
future<> audit::stop_audit() {
|
future<> audit::stop_audit() {
|
||||||
if (!audit_instance().local_is_initialized()) {
|
if (!audit_instance().local_is_initialized()) {
|
||||||
return make_ready_future<>();
|
return make_ready_future<>();
|
||||||
}
|
}
|
||||||
return audit::audit::audit_instance().invoke_on_all([] (auto& local_audit) {
|
return audit::audit::audit_instance().invoke_on_all([] (auto& local_audit) {
|
||||||
|
SCYLLA_ASSERT(!local_audit._storage_running);
|
||||||
return local_audit.shutdown();
|
return local_audit.shutdown();
|
||||||
}).then([] {
|
}).then([] {
|
||||||
return audit::audit::audit_instance().stop();
|
return audit::audit::audit_instance().stop();
|
||||||
@@ -223,14 +237,6 @@ audit_info_ptr audit::create_audit_info(statement_category cat, const sstring& k
|
|||||||
return std::make_unique<audit_info>(cat, keyspace, table, batch);
|
return std::make_unique<audit_info>(cat, keyspace, table, batch);
|
||||||
}
|
}
|
||||||
|
|
||||||
future<> audit::start(const db::config& cfg) {
|
|
||||||
return _storage_helper_ptr->start(cfg);
|
|
||||||
}
|
|
||||||
|
|
||||||
future<> audit::stop() {
|
|
||||||
return _storage_helper_ptr->stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
future<> audit::shutdown() {
|
future<> audit::shutdown() {
|
||||||
return make_ready_future<>();
|
return make_ready_future<>();
|
||||||
}
|
}
|
||||||
@@ -241,6 +247,12 @@ future<> audit::log(const audit_info& audit_info, const service::client_state& c
|
|||||||
const sstring& username = client_state.user() ? client_state.user()->name.value_or(anonymous_username) : no_username;
|
const sstring& username = client_state.user() ? client_state.user()->name.value_or(anonymous_username) : no_username;
|
||||||
socket_address client_ip = client_state.get_client_address().addr();
|
socket_address client_ip = client_state.get_client_address().addr();
|
||||||
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
|
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
|
||||||
|
if (!_storage_running) {
|
||||||
|
on_internal_error_noexcept(logger, fmt::format("Audit log dropped (storage not ready): node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}",
|
||||||
|
node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(),
|
||||||
|
audit_info.query(), client_ip, audit_info.table(), username));
|
||||||
|
return make_ready_future<>();
|
||||||
|
}
|
||||||
if (logger.is_enabled(logging::log_level::debug)) {
|
if (logger.is_enabled(logging::log_level::debug)) {
|
||||||
logger.debug("Log written: node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}",
|
logger.debug("Log written: node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}",
|
||||||
node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(),
|
node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(),
|
||||||
@@ -286,6 +298,11 @@ future<> inspect(const audit_info_alternator& ai, const service::client_state& c
|
|||||||
|
|
||||||
future<> audit::log_login(const sstring& username, socket_address client_ip, bool error) noexcept {
|
future<> audit::log_login(const sstring& username, socket_address client_ip, bool error) noexcept {
|
||||||
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
|
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
|
||||||
|
if (!_storage_running) {
|
||||||
|
on_internal_error_noexcept(logger, fmt::format("Audit login log dropped (storage not ready): node_ip {} client_ip {} username {} error {}",
|
||||||
|
node_ip, client_ip, username, error ? "true" : "false"));
|
||||||
|
return make_ready_future<>();
|
||||||
|
}
|
||||||
if (logger.is_enabled(logging::log_level::debug)) {
|
if (logger.is_enabled(logging::log_level::debug)) {
|
||||||
logger.debug("Login log written: node_ip {}, client_ip {}, username {}, error {}",
|
logger.debug("Login log written: node_ip {}, client_ip {}, username {}, error {}",
|
||||||
node_ip, client_ip, username, error ? "true" : "false");
|
node_ip, client_ip, username, error ? "true" : "false");
|
||||||
|
|||||||
@@ -141,6 +141,7 @@ private:
|
|||||||
category_set _audited_categories;
|
category_set _audited_categories;
|
||||||
|
|
||||||
std::unique_ptr<storage_helper> _storage_helper_ptr;
|
std::unique_ptr<storage_helper> _storage_helper_ptr;
|
||||||
|
bool _storage_running = false;
|
||||||
|
|
||||||
const db::config& _cfg;
|
const db::config& _cfg;
|
||||||
utils::observer<sstring> _cfg_keyspaces_observer;
|
utils::observer<sstring> _cfg_keyspaces_observer;
|
||||||
@@ -163,6 +164,8 @@ public:
|
|||||||
return audit_instance().local();
|
return audit_instance().local();
|
||||||
}
|
}
|
||||||
static future<> start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm);
|
static future<> start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm);
|
||||||
|
static future<> start_storage(const db::config& cfg);
|
||||||
|
static future<> stop_storage();
|
||||||
static future<> stop_audit();
|
static future<> stop_audit();
|
||||||
static audit_info_ptr create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table, bool batch = false);
|
static audit_info_ptr create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table, bool batch = false);
|
||||||
audit(locator::shared_token_metadata& stm,
|
audit(locator::shared_token_metadata& stm,
|
||||||
@@ -174,8 +177,6 @@ public:
|
|||||||
category_set&& audited_categories,
|
category_set&& audited_categories,
|
||||||
const db::config& cfg);
|
const db::config& cfg);
|
||||||
~audit();
|
~audit();
|
||||||
future<> start(const db::config& cfg);
|
|
||||||
future<> stop();
|
|
||||||
future<> shutdown();
|
future<> shutdown();
|
||||||
bool should_log(const audit_info& audit_info) const;
|
bool should_log(const audit_info& audit_info) const;
|
||||||
bool will_log(statement_category cat, std::string_view keyspace = {}, std::string_view table = {}) const;
|
bool will_log(statement_category cat, std::string_view keyspace = {}, std::string_view table = {}) const;
|
||||||
|
|||||||
@@ -258,13 +258,11 @@ future<> ldap_role_manager::start() {
|
|||||||
} catch (const seastar::sleep_aborted&) {
|
} catch (const seastar::sleep_aborted&) {
|
||||||
co_return; // ignore
|
co_return; // ignore
|
||||||
}
|
}
|
||||||
co_await _cache.container().invoke_on_all([] (cache& c) -> future<> {
|
try {
|
||||||
try {
|
co_await _cache.reload_all_permissions();
|
||||||
co_await c.reload_all_permissions();
|
} catch (...) {
|
||||||
} catch (...) {
|
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
|
||||||
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
|
}
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
return _std_mgr.start();
|
return _std_mgr.start();
|
||||||
|
|||||||
@@ -157,15 +157,12 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
|
|||||||
return create_legacy_keyspace_if_missing(mm);
|
return create_legacy_keyspace_if_missing(mm);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
co_await _role_manager->start();
|
// Authorizer must be started before the permission loader is set,
|
||||||
if (this_shard_id() == 0) {
|
// because the loader calls _authorizer->authorize().
|
||||||
// Role manager and password authenticator have this odd startup
|
// The loader must be set before starting the role manager, because
|
||||||
// mechanism where they asynchronously create the superuser role
|
// LDAP role manager starts a pruner fiber that calls
|
||||||
// in the background. Correct password creation depends on role
|
// reload_all_permissions() which asserts _permission_loader is set.
|
||||||
// creation therefore we need to wait here.
|
co_await _authorizer->start();
|
||||||
co_await _role_manager->ensure_superuser_is_created();
|
|
||||||
}
|
|
||||||
co_await when_all_succeed(_authorizer->start(), _authenticator->start()).discard_result();
|
|
||||||
if (!_used_by_maintenance_socket) {
|
if (!_used_by_maintenance_socket) {
|
||||||
// Maintenance socket mode can't cache permissions because it has
|
// Maintenance socket mode can't cache permissions because it has
|
||||||
// different authorizer. We can't mix cached permissions, they could be
|
// different authorizer. We can't mix cached permissions, they could be
|
||||||
@@ -174,12 +171,27 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
|
|||||||
&service::get_uncached_permissions,
|
&service::get_uncached_permissions,
|
||||||
this, std::placeholders::_1, std::placeholders::_2));
|
this, std::placeholders::_1, std::placeholders::_2));
|
||||||
}
|
}
|
||||||
|
co_await _role_manager->start();
|
||||||
|
if (this_shard_id() == 0) {
|
||||||
|
// Role manager and password authenticator have this odd startup
|
||||||
|
// mechanism where they asynchronously create the superuser role
|
||||||
|
// in the background. Correct password creation depends on role
|
||||||
|
// creation therefore we need to wait here.
|
||||||
|
co_await _role_manager->ensure_superuser_is_created();
|
||||||
|
}
|
||||||
|
// Authenticator must be started after ensure_superuser_is_created()
|
||||||
|
// because password_authenticator queries system.roles for the
|
||||||
|
// superuser entry created by the role manager.
|
||||||
|
co_await _authenticator->start();
|
||||||
}
|
}
|
||||||
|
|
||||||
future<> service::stop() {
|
future<> service::stop() {
|
||||||
_as.request_abort();
|
_as.request_abort();
|
||||||
|
// Reverse of start() order.
|
||||||
|
co_await _authenticator->stop();
|
||||||
|
co_await _role_manager->stop();
|
||||||
_cache.set_permission_loader(nullptr);
|
_cache.set_permission_loader(nullptr);
|
||||||
return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop()).discard_result();
|
co_await _authorizer->stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
future<> service::ensure_superuser_is_created() {
|
future<> service::ensure_superuser_is_created() {
|
||||||
|
|||||||
@@ -71,7 +71,7 @@ used. If it is used, the statement will be a no-op if the materialized view alre
|
|||||||
MV Select Statement
|
MV Select Statement
|
||||||
...................
|
...................
|
||||||
|
|
||||||
The select statement of a materialized view creation defines which of the base table columns are included in the view. That
|
The select statement of a materialized view creation defines which of the base table is included in the view. That
|
||||||
statement is limited in a number of ways:
|
statement is limited in a number of ways:
|
||||||
|
|
||||||
- The :ref:`selection <selection-clause>` is limited to those that only select columns of the base table. In other
|
- The :ref:`selection <selection-clause>` is limited to those that only select columns of the base table. In other
|
||||||
|
|||||||
44
main.cc
44
main.cc
@@ -1810,6 +1810,18 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
|||||||
utils::get_local_injector().inject("stop_after_starting_migration_manager",
|
utils::get_local_injector().inject("stop_after_starting_migration_manager",
|
||||||
[] { std::raise(SIGSTOP); });
|
[] { std::raise(SIGSTOP); });
|
||||||
|
|
||||||
|
// Audit must be constructed before the maintenance socket so
|
||||||
|
// that on shutdown (reverse destruction order) the audit service
|
||||||
|
// outlives the maintenance socket and in-flight queries can
|
||||||
|
// still reach audit::inspect() safely.
|
||||||
|
checkpoint(stop_signal, "starting audit service");
|
||||||
|
audit::audit::start_audit(*cfg, token_metadata, qp, mm).handle_exception([&] (auto&& e) {
|
||||||
|
startlog.error("audit start failed: {}", e);
|
||||||
|
}).get();
|
||||||
|
auto audit_stop = defer([] {
|
||||||
|
audit::audit::stop_audit().get();
|
||||||
|
});
|
||||||
|
|
||||||
// XXX: stop_raft has to happen before query_processor and migration_manager
|
// XXX: stop_raft has to happen before query_processor and migration_manager
|
||||||
// is stopped, since some groups keep using the query
|
// is stopped, since some groups keep using the query
|
||||||
// processor until are stopped inside stop_raft.
|
// processor until are stopped inside stop_raft.
|
||||||
@@ -2340,15 +2352,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
|||||||
}).get();
|
}).get();
|
||||||
stop_signal.ready(false);
|
stop_signal.ready(false);
|
||||||
|
|
||||||
if (cfg->maintenance_socket() != "ignore") {
|
|
||||||
// Enable role operations now that node joined the cluster
|
|
||||||
maintenance_auth_service.invoke_on_all([](auth::service& svc) {
|
|
||||||
return auth::ensure_role_operations_are_enabled(svc);
|
|
||||||
}).get();
|
|
||||||
|
|
||||||
start_cql(*cql_maintenance_server_ctl, stop_maintenance_cql, "maintenance native server");
|
|
||||||
}
|
|
||||||
|
|
||||||
// At this point, `locator::topology` should be stable, i.e. we should have complete information
|
// At this point, `locator::topology` should be stable, i.e. we should have complete information
|
||||||
// about the layout of the cluster (= list of nodes along with the racks/DCs).
|
// about the layout of the cluster (= list of nodes along with the racks/DCs).
|
||||||
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
|
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
|
||||||
@@ -2357,16 +2360,23 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
|||||||
startlog.info("Verifying that all of the tablet keyspaces use rack list replication factors");
|
startlog.info("Verifying that all of the tablet keyspaces use rack list replication factors");
|
||||||
db.local().check_rack_list_everywhere(cfg->enforce_rack_list());
|
db.local().check_rack_list_everywhere(cfg->enforce_rack_list());
|
||||||
|
|
||||||
// Start audit service after join_cluster so that the table-based audit backend
|
// The table-based audit backend needs Raft (via join_cluster)
|
||||||
// can properly create its keyspace and table.
|
// to create its keyspace and table.
|
||||||
checkpoint(stop_signal, "starting audit service");
|
checkpoint(stop_signal, "starting audit storage");
|
||||||
audit::audit::start_audit(*cfg, token_metadata, qp, mm).handle_exception([&] (auto&& e) {
|
audit::audit::start_storage(*cfg).get();
|
||||||
startlog.error("audit start failed: {}", e);
|
auto audit_storage_stop = defer([] {
|
||||||
}).get();
|
audit::audit::stop_storage().get();
|
||||||
auto audit_stop = defer([] {
|
|
||||||
audit::audit::stop_audit().get();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (cfg->maintenance_socket() != "ignore") {
|
||||||
|
// Enable role operations now that node joined the cluster
|
||||||
|
maintenance_auth_service.invoke_on_all([](auth::service& svc) {
|
||||||
|
return auth::ensure_role_operations_are_enabled(svc);
|
||||||
|
}).get();
|
||||||
|
|
||||||
|
start_cql(*cql_maintenance_server_ctl, stop_maintenance_cql, "maintenance native server");
|
||||||
|
}
|
||||||
|
|
||||||
// Semantic validation of sstable compression parameters from config.
|
// Semantic validation of sstable compression parameters from config.
|
||||||
// Adding here (i.e., after `join_cluster`) to ensure that the
|
// Adding here (i.e., after `join_cluster`) to ensure that the
|
||||||
// required SSTABLE_COMPRESSION_DICTS cluster feature has been negotiated.
|
// required SSTABLE_COMPRESSION_DICTS cluster feature has been negotiated.
|
||||||
|
|||||||
@@ -4237,6 +4237,7 @@ public:
|
|||||||
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
|
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
|
||||||
, _async_gate("topology_coordinator")
|
, _async_gate("topology_coordinator")
|
||||||
{
|
{
|
||||||
|
_lifecycle_notifier.register_subscriber(this);
|
||||||
_db.get_notifier().register_listener(this);
|
_db.get_notifier().register_listener(this);
|
||||||
// When the delay_cdc_stream_finalization error injection is disabled
|
// When the delay_cdc_stream_finalization error injection is disabled
|
||||||
// (test releases it), wake the topology coordinator so it retries
|
// (test releases it), wake the topology coordinator so it retries
|
||||||
@@ -4400,6 +4401,7 @@ future<bool> topology_coordinator::maybe_retry_failed_rf_change_tablet_rebuilds(
|
|||||||
}
|
}
|
||||||
|
|
||||||
future<> topology_coordinator::refresh_tablet_load_stats() {
|
future<> topology_coordinator::refresh_tablet_load_stats() {
|
||||||
|
co_await utils::get_local_injector().inject("refresh_tablet_load_stats_pause", utils::wait_for_message(5min));
|
||||||
auto tm = get_token_metadata_ptr();
|
auto tm = get_token_metadata_ptr();
|
||||||
|
|
||||||
locator::load_stats stats;
|
locator::load_stats stats;
|
||||||
@@ -4723,7 +4725,6 @@ future<> topology_coordinator::run() {
|
|||||||
|
|
||||||
co_await _async_gate.close();
|
co_await _async_gate.close();
|
||||||
co_await std::move(tablet_load_stats_refresher);
|
co_await std::move(tablet_load_stats_refresher);
|
||||||
co_await _tablet_load_stats_refresh.join();
|
|
||||||
co_await std::move(cdc_generation_publisher);
|
co_await std::move(cdc_generation_publisher);
|
||||||
co_await std::move(cdc_streams_gc);
|
co_await std::move(cdc_streams_gc);
|
||||||
co_await std::move(gossiper_orphan_remover);
|
co_await std::move(gossiper_orphan_remover);
|
||||||
@@ -4736,6 +4737,8 @@ future<> topology_coordinator::stop() {
|
|||||||
co_await _db.get_notifier().unregister_listener(this);
|
co_await _db.get_notifier().unregister_listener(this);
|
||||||
utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization");
|
utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization");
|
||||||
_topo_sm.on_tablet_split_ready = nullptr;
|
_topo_sm.on_tablet_split_ready = nullptr;
|
||||||
|
co_await _lifecycle_notifier.unregister_subscriber(this);
|
||||||
|
co_await _tablet_load_stats_refresh.join();
|
||||||
|
|
||||||
// if topology_coordinator::run() is aborted either because we are not a
|
// if topology_coordinator::run() is aborted either because we are not a
|
||||||
// leader anymore, or we are shutting down as a leader, we have to handle
|
// leader anymore, or we are shutting down as a leader, we have to handle
|
||||||
@@ -4797,7 +4800,6 @@ future<> run_topology_coordinator(
|
|||||||
topology_cmd_rpc_tracker};
|
topology_cmd_rpc_tracker};
|
||||||
|
|
||||||
std::exception_ptr ex;
|
std::exception_ptr ex;
|
||||||
lifecycle_notifier.register_subscriber(&coordinator);
|
|
||||||
try {
|
try {
|
||||||
rtlogger.info("start topology coordinator fiber");
|
rtlogger.info("start topology coordinator fiber");
|
||||||
co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
|
co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
|
||||||
@@ -4818,7 +4820,7 @@ future<> run_topology_coordinator(
|
|||||||
}
|
}
|
||||||
on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex));
|
on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex));
|
||||||
}
|
}
|
||||||
co_await lifecycle_notifier.unregister_subscriber(&coordinator);
|
co_await utils::get_local_injector().inject("topology_coordinator_pause_before_stop", utils::wait_for_message(5min));
|
||||||
co_await coordinator.stop();
|
co_await coordinator.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -543,11 +543,16 @@ future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept {
|
|||||||
// during SSTable writing and removed before sealing. If the write
|
// during SSTable writing and removed before sealing. If the write
|
||||||
// failed before sealing, the file may still be on disk and must be
|
// failed before sealing, the file may still be on disk and must be
|
||||||
// cleaned up explicitly.
|
// cleaned up explicitly.
|
||||||
|
// The component is only defined for the `ms` sstable format; for
|
||||||
|
// older formats it is absent from the component map and looking up
|
||||||
|
// its filename would throw std::out_of_range.
|
||||||
// Use file_exists() to avoid a C++ exception on the common path
|
// Use file_exists() to avoid a C++ exception on the common path
|
||||||
// where the file was already removed before sealing.
|
// where the file was already removed before sealing.
|
||||||
auto temp_hashes = filename(sst, dir_name.native(), sst._generation, component_type::TemporaryHashes);
|
if (sstable_version_constants::get_component_map(sst.get_version()).contains(component_type::TemporaryHashes)) {
|
||||||
if (co_await file_exists(temp_hashes)) {
|
auto temp_hashes = filename(sst, dir_name.native(), sst._generation, component_type::TemporaryHashes);
|
||||||
co_await sst.sstable_write_io_check(remove_file, std::move(temp_hashes));
|
if (co_await file_exists(temp_hashes)) {
|
||||||
|
co_await sst.sstable_write_io_check(remove_file, std::move(temp_hashes));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (sync) {
|
if (sync) {
|
||||||
co_await sst.sstable_write_io_check(sync_directory, dir_name.native());
|
co_await sst.sstable_write_io_check(sync_directory, dir_name.native());
|
||||||
|
|||||||
@@ -246,6 +246,33 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_extra_temporary_toc) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reproducer for SCYLLADB-1697
|
||||||
|
SEASTAR_TEST_CASE(sstable_directory_test_unlink_sstable_leaves_no_orphans) {
|
||||||
|
return sstables::test_env::do_with_async([] (test_env& env) {
|
||||||
|
for (const auto version : {sstable_version_types::me, sstable_version_types::ms}) {
|
||||||
|
testlog.info("Testing sstable version: {}", version);
|
||||||
|
auto sst = make_sstable_for_this_shard([&env, version] {
|
||||||
|
return env.make_sstable(test_table_schema(), version);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Sanity: the TOC was written, otherwise the assertion below would be vacuous.
|
||||||
|
BOOST_REQUIRE(file_exists(test(sst).filename(sstables::component_type::TOC).native()).get());
|
||||||
|
|
||||||
|
sst->unlink().get();
|
||||||
|
|
||||||
|
std::vector<sstring> remaining;
|
||||||
|
lister::scan_dir(env.tempdir().path(), lister::dir_entry_types::of<directory_entry_type::regular>(),
|
||||||
|
[&remaining] (fs::path, directory_entry de) {
|
||||||
|
remaining.push_back(de.name);
|
||||||
|
return make_ready_future<>();
|
||||||
|
}).get();
|
||||||
|
|
||||||
|
BOOST_REQUIRE_MESSAGE(remaining.empty(),
|
||||||
|
fmt::format("Expected empty sstable dir after unlink for version {}, found: {}", version, remaining));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// Test the absence of TOC. Behavior is controllable by a flag
|
// Test the absence of TOC. Behavior is controllable by a flag
|
||||||
SEASTAR_TEST_CASE(sstable_directory_test_table_missing_toc) {
|
SEASTAR_TEST_CASE(sstable_directory_test_table_missing_toc) {
|
||||||
return sstables::test_env::do_with_async([] (test_env& env) {
|
return sstables::test_env::do_with_async([] (test_env& env) {
|
||||||
|
|||||||
@@ -11,13 +11,11 @@ from typing import TYPE_CHECKING
|
|||||||
|
|
||||||
from cassandra.auth import PlainTextAuthProvider
|
from cassandra.auth import PlainTextAuthProvider
|
||||||
|
|
||||||
from test.pylib.internal_types import ServerInfo
|
|
||||||
from test.pylib.manager_client import ManagerClient
|
from test.pylib.manager_client import ManagerClient
|
||||||
from test.cluster.dtest.ccmlib.common import logger
|
from test.cluster.dtest.ccmlib.common import logger
|
||||||
from test.cluster.dtest.ccmlib.scylla_node import ScyllaNode
|
from test.cluster.dtest.ccmlib.scylla_node import ScyllaNode
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from collections.abc import Iterable
|
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
@@ -29,6 +27,10 @@ class ScyllaCluster:
|
|||||||
self.manager = manager
|
self.manager = manager
|
||||||
self.scylla_mode = scylla_mode
|
self.scylla_mode = scylla_mode
|
||||||
self._config_options = {}
|
self._config_options = {}
|
||||||
|
# Cached ScyllaNode instances. Nodes are appended by _add_nodes()
|
||||||
|
# in the order they are created by servers_add().
|
||||||
|
self._nodes: list[ScyllaNode] = []
|
||||||
|
self._next_node_num: int = 1
|
||||||
|
|
||||||
if self.scylla_mode == "debug":
|
if self.scylla_mode == "debug":
|
||||||
self.default_wait_other_notice_timeout = 600
|
self.default_wait_other_notice_timeout = 600
|
||||||
@@ -39,19 +41,20 @@ class ScyllaCluster:
|
|||||||
|
|
||||||
self.force_wait_for_cluster_start = force_wait_for_cluster_start
|
self.force_wait_for_cluster_start = force_wait_for_cluster_start
|
||||||
|
|
||||||
@staticmethod
|
def _add_nodes(self, servers: list) -> None:
|
||||||
def _sorted_nodes(servers: Iterable[ServerInfo]) -> list[ServerInfo]:
|
"""Create ScyllaNode instances for the given servers and cache them."""
|
||||||
return sorted(servers, key=lambda s: s.server_id)
|
for server in servers:
|
||||||
|
name = f"node{self._next_node_num}"
|
||||||
|
self._next_node_num += 1
|
||||||
|
self._nodes.append(ScyllaNode(
|
||||||
|
cluster=self, server=server, name=name))
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def nodes(self) -> dict[str, ScyllaNode]:
|
def nodes(self) -> dict[str, ScyllaNode]:
|
||||||
return {node.name: node for node in self.nodelist()}
|
return {node.name: node for node in self.nodelist()}
|
||||||
|
|
||||||
def nodelist(self) -> list[ScyllaNode]:
|
def nodelist(self) -> list[ScyllaNode]:
|
||||||
return [
|
return list(self._nodes)
|
||||||
ScyllaNode(cluster=self, server=server, name=f"node{n}")
|
|
||||||
for n, server in enumerate(self._sorted_nodes(self.manager.all_servers()), start=1)
|
|
||||||
]
|
|
||||||
|
|
||||||
def get_node_ip(self, nodeid: int) -> str:
|
def get_node_ip(self, nodeid: int) -> str:
|
||||||
return self.nodelist()[nodeid-1].address()
|
return self.nodelist()[nodeid-1].address()
|
||||||
@@ -61,16 +64,16 @@ class ScyllaCluster:
|
|||||||
self.manager.auth_provider = PlainTextAuthProvider(username="cassandra", password="cassandra")
|
self.manager.auth_provider = PlainTextAuthProvider(username="cassandra", password="cassandra")
|
||||||
match nodes:
|
match nodes:
|
||||||
case int():
|
case int():
|
||||||
self.manager.servers_add(servers_num=nodes, config=self._config_options, start=False, auto_rack_dc="dc1")
|
self._add_nodes(self.manager.servers_add(servers_num=nodes, config=self._config_options, start=False, auto_rack_dc="dc1"))
|
||||||
case list():
|
case list():
|
||||||
for dc, n_nodes in enumerate(nodes, start=1):
|
for dc, n_nodes in enumerate(nodes, start=1):
|
||||||
dc_name = f"dc{dc}"
|
dc_name = f"dc{dc}"
|
||||||
self.manager.servers_add(
|
self._add_nodes(self.manager.servers_add(
|
||||||
servers_num=n_nodes,
|
servers_num=n_nodes,
|
||||||
config=self._config_options,
|
config=self._config_options,
|
||||||
start=False,
|
start=False,
|
||||||
auto_rack_dc=dc_name
|
auto_rack_dc=dc_name
|
||||||
)
|
))
|
||||||
case dict():
|
case dict():
|
||||||
# Supported spec: {"dc1": {"rack1": 3, "rack2": 2}, "dc2": {"rack1": 2}}
|
# Supported spec: {"dc1": {"rack1": 3, "rack2": 2}, "dc2": {"rack1": 2}}
|
||||||
for dc, dc_nodes in nodes.items():
|
for dc, dc_nodes in nodes.items():
|
||||||
@@ -79,7 +82,7 @@ class ScyllaCluster:
|
|||||||
for rack, rack_nodes in dc_nodes.items():
|
for rack, rack_nodes in dc_nodes.items():
|
||||||
if not isinstance(rack_nodes, int):
|
if not isinstance(rack_nodes, int):
|
||||||
raise RuntimeError(f"Unsupported topology specification: {nodes}")
|
raise RuntimeError(f"Unsupported topology specification: {nodes}")
|
||||||
self.manager.servers_add(
|
self._add_nodes(self.manager.servers_add(
|
||||||
servers_num=rack_nodes,
|
servers_num=rack_nodes,
|
||||||
config=self._config_options,
|
config=self._config_options,
|
||||||
property_file={
|
property_file={
|
||||||
@@ -87,7 +90,7 @@ class ScyllaCluster:
|
|||||||
"rack": rack,
|
"rack": rack,
|
||||||
},
|
},
|
||||||
start=False,
|
start=False,
|
||||||
)
|
))
|
||||||
case _:
|
case _:
|
||||||
raise RuntimeError(f"Unsupported topology specification: {nodes}")
|
raise RuntimeError(f"Unsupported topology specification: {nodes}")
|
||||||
|
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ from itertools import chain
|
|||||||
from functools import cached_property
|
from functools import cached_property
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import TYPE_CHECKING, Any
|
from typing import TYPE_CHECKING, Any
|
||||||
|
import logging
|
||||||
|
|
||||||
from test.cluster.dtest.ccmlib.common import ArgumentError, wait_for, BIN_DIR
|
from test.cluster.dtest.ccmlib.common import ArgumentError, wait_for, BIN_DIR
|
||||||
from test.pylib.internal_types import ServerUpState
|
from test.pylib.internal_types import ServerUpState
|
||||||
@@ -28,6 +29,9 @@ if TYPE_CHECKING:
|
|||||||
from test.cluster.dtest.ccmlib.scylla_cluster import ScyllaCluster
|
from test.cluster.dtest.ccmlib.scylla_cluster import ScyllaCluster
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger("scylla_node")
|
||||||
|
|
||||||
|
|
||||||
NODETOOL_STDERR_IGNORED_PATTERNS = (
|
NODETOOL_STDERR_IGNORED_PATTERNS = (
|
||||||
re.compile(r"WARNING: debug mode. Not for benchmarking or production"),
|
re.compile(r"WARNING: debug mode. Not for benchmarking or production"),
|
||||||
re.compile(
|
re.compile(
|
||||||
@@ -149,15 +153,20 @@ class ScyllaNode:
|
|||||||
return self.cluster.scylla_mode
|
return self.cluster.scylla_mode
|
||||||
|
|
||||||
def set_smp(self, smp: int) -> None:
|
def set_smp(self, smp: int) -> None:
|
||||||
|
logger.debug(f"Setting smp: {self=} {smp=}")
|
||||||
self._smp_set_during_test = smp
|
self._smp_set_during_test = smp
|
||||||
|
|
||||||
def smp(self) -> int:
|
def smp(self) -> int:
|
||||||
|
logger.debug(f"Getting smp: {self=} _smp_set_during_test={self._smp_set_during_test} _smp={self._smp} {DEFAULT_SMP=}")
|
||||||
return self._smp_set_during_test or self._smp or DEFAULT_SMP
|
return self._smp_set_during_test or self._smp or DEFAULT_SMP
|
||||||
|
|
||||||
def memory(self) -> int:
|
def memory(self) -> int:
|
||||||
return self._memory or self.smp() * DEFAULT_MEMORY_PER_CPU
|
return self._memory or self.smp() * DEFAULT_MEMORY_PER_CPU
|
||||||
|
|
||||||
def _adjust_smp_and_memory(self, smp: int | None = None, memory: int | None = None) -> None:
|
def _adjust_smp_and_memory(self, smp: int | None = None, memory: int | None = None) -> None:
|
||||||
|
if not memory and not smp:
|
||||||
|
return
|
||||||
|
logger.debug(f"Adjusting smp={smp} memory={memory} current_smp={self._smp} current_memory={self._memory}")
|
||||||
if memory:
|
if memory:
|
||||||
self._memory = memory // (smp or self.smp()) * self.smp()
|
self._memory = memory // (smp or self.smp()) * self.smp()
|
||||||
if smp:
|
if smp:
|
||||||
@@ -446,6 +455,8 @@ class ScyllaNode:
|
|||||||
|
|
||||||
self.mark = self.mark_log()
|
self.mark = self.mark_log()
|
||||||
|
|
||||||
|
logger.debug(f"Starting server: server_id={self.server_id} {scylla_args=} {scylla_env=}")
|
||||||
|
|
||||||
self.cluster.manager.server_start(
|
self.cluster.manager.server_start(
|
||||||
server_id=self.server_id,
|
server_id=self.server_id,
|
||||||
seeds=None if self.bootstrap else [self.address()],
|
seeds=None if self.bootstrap else [self.address()],
|
||||||
|
|||||||
46
test/cluster/dtest/set_smp_test.py
Normal file
46
test/cluster/dtest/set_smp_test.py
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
#
|
||||||
|
# Copyright (C) 2026-present ScyllaDB
|
||||||
|
#
|
||||||
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||||
|
#
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from dtest_class import Tester
|
||||||
|
|
||||||
|
logger = logging.getLogger(__file__)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.single_node
|
||||||
|
class TestSetSmp(Tester):
|
||||||
|
"""Test that node.set_smp() properly persists across restarts."""
|
||||||
|
|
||||||
|
def _get_smp_from_log(self, node, from_mark=None):
|
||||||
|
"""Extract smp value from the node's log by looking at the SHARD_COUNT gossip value."""
|
||||||
|
matches = node.grep_log(r"SHARD_COUNT : Value\((\d+),\d+\)", from_mark=from_mark)
|
||||||
|
assert matches, "Could not find SHARD_COUNT in node log"
|
||||||
|
# Return the last match (most recent start)
|
||||||
|
return int(matches[-1][1].group(1))
|
||||||
|
|
||||||
|
def test_set_smp(self):
|
||||||
|
"""Verify that set_smp() takes effect on the next start."""
|
||||||
|
cluster = self.cluster
|
||||||
|
cluster.populate(1).start(wait_for_binary_proto=True)
|
||||||
|
node1 = cluster.nodelist()[0]
|
||||||
|
|
||||||
|
default_smp = self._get_smp_from_log(node1)
|
||||||
|
|
||||||
|
cluster.stop()
|
||||||
|
|
||||||
|
# set_smp to a different value and restart without jvm_args
|
||||||
|
target_smp = 1 if default_smp != 1 else 2
|
||||||
|
node1.set_smp(target_smp)
|
||||||
|
mark = node1.mark_log()
|
||||||
|
cluster.start(wait_for_binary_proto=True)
|
||||||
|
|
||||||
|
node1 = cluster.nodelist()[0]
|
||||||
|
actual_smp = self._get_smp_from_log(node1, from_mark=mark)
|
||||||
|
assert actual_smp == target_smp, \
|
||||||
|
f"Expected smp={target_smp} after set_smp({target_smp}), got {actual_smp}"
|
||||||
@@ -29,12 +29,16 @@ import pytest
|
|||||||
from cassandra import AlreadyExists, AuthenticationFailed, ConsistencyLevel, InvalidRequest, Unauthorized, Unavailable, WriteFailure
|
from cassandra import AlreadyExists, AuthenticationFailed, ConsistencyLevel, InvalidRequest, Unauthorized, Unavailable, WriteFailure
|
||||||
from cassandra.auth import PlainTextAuthProvider
|
from cassandra.auth import PlainTextAuthProvider
|
||||||
from cassandra.cluster import NoHostAvailable, Session, EXEC_PROFILE_DEFAULT
|
from cassandra.cluster import NoHostAvailable, Session, EXEC_PROFILE_DEFAULT
|
||||||
|
from cassandra.connection import UnixSocketEndPoint
|
||||||
|
from cassandra.policies import WhiteListRoundRobinPolicy
|
||||||
from cassandra.query import BatchStatement, BatchType, SimpleStatement, named_tuple_factory
|
from cassandra.query import BatchStatement, BatchType, SimpleStatement, named_tuple_factory
|
||||||
|
|
||||||
|
from test.cluster.conftest import cluster_con
|
||||||
from test.cluster.dtest.dtest_class import create_ks, wait_for
|
from test.cluster.dtest.dtest_class import create_ks, wait_for
|
||||||
from test.cluster.dtest.tools.assertions import assert_invalid
|
from test.cluster.dtest.tools.assertions import assert_invalid
|
||||||
from test.cluster.dtest.tools.data import rows_to_list, run_in_parallel
|
from test.cluster.dtest.tools.data import rows_to_list, run_in_parallel
|
||||||
|
|
||||||
|
from test.pylib.driver_utils import safe_driver_shutdown
|
||||||
from test.pylib.manager_client import ManagerClient
|
from test.pylib.manager_client import ManagerClient
|
||||||
from test.pylib.rest_client import read_barrier
|
from test.pylib.rest_client import read_barrier
|
||||||
from test.pylib.skip_types import skip_env
|
from test.pylib.skip_types import skip_env
|
||||||
@@ -273,6 +277,7 @@ class AuditEntry:
|
|||||||
statement: str
|
statement: str
|
||||||
table: str
|
table: str
|
||||||
user: str
|
user: str
|
||||||
|
source: str = "127.0.0.1"
|
||||||
|
|
||||||
|
|
||||||
class AuditBackend:
|
class AuditBackend:
|
||||||
@@ -449,6 +454,13 @@ class AuditBackendSyslog(AuditBackend):
|
|||||||
entries.append(self.line_to_row(line, idx))
|
entries.append(self.line_to_row(line, idx))
|
||||||
return { self.audit_mode(): entries }
|
return { self.audit_mode(): entries }
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _parse_address(addr_port):
|
||||||
|
"""Extract IP from 'ip:port' (IPv4) or '[ip]:port' (IPv6)."""
|
||||||
|
if addr_port.startswith("["):
|
||||||
|
return addr_port[1:addr_port.index("]")]
|
||||||
|
return addr_port.split(":")[0]
|
||||||
|
|
||||||
def line_to_row(self, line, idx):
|
def line_to_row(self, line, idx):
|
||||||
metadata, data = line.split(": ", 1)
|
metadata, data = line.split(": ", 1)
|
||||||
data = "".join(data.splitlines()) # Remove newlines
|
data = "".join(data.splitlines()) # Remove newlines
|
||||||
@@ -460,9 +472,9 @@ class AuditBackendSyslog(AuditBackend):
|
|||||||
# and make sure it doesn't change during the test (e.g. when the test is running at 23:59:59)
|
# and make sure it doesn't change during the test (e.g. when the test is running at 23:59:59)
|
||||||
date = datetime.datetime(2000, 1, 1, 0, 0)
|
date = datetime.datetime(2000, 1, 1, 0, 0)
|
||||||
|
|
||||||
node = match.group("node").split(":")[0]
|
node = self._parse_address(match.group("node"))
|
||||||
statement = match.group("query").replace("\\", "")
|
statement = match.group("query").replace("\\", "")
|
||||||
source = match.group("client_ip").split(":")[0]
|
source = self._parse_address(match.group("client_ip"))
|
||||||
event_time = uuid.UUID(int=idx)
|
event_time = uuid.UUID(int=idx)
|
||||||
t = self.named_tuple_factory(date, node, event_time, match.group("category"), match.group("cl"), match.group("error") == "true", match.group("keyspace"), statement, source, match.group("table"), match.group("username"))
|
t = self.named_tuple_factory(date, node, event_time, match.group("category"), match.group("cl"), match.group("error") == "true", match.group("keyspace"), statement, source, match.group("table"), match.group("username"))
|
||||||
return t
|
return t
|
||||||
@@ -582,6 +594,7 @@ class CQLAuditTester(AuditTester):
|
|||||||
user="anonymous",
|
user="anonymous",
|
||||||
cl="ONE",
|
cl="ONE",
|
||||||
error=False,
|
error=False,
|
||||||
|
source="127.0.0.1",
|
||||||
):
|
):
|
||||||
self.assert_audit_row_fields(row)
|
self.assert_audit_row_fields(row)
|
||||||
assert row.node in self.server_addresses
|
assert row.node in self.server_addresses
|
||||||
@@ -590,7 +603,7 @@ class CQLAuditTester(AuditTester):
|
|||||||
assert row.error == error
|
assert row.error == error
|
||||||
assert row.keyspace_name == ks
|
assert row.keyspace_name == ks
|
||||||
assert row.operation == statement
|
assert row.operation == statement
|
||||||
assert row.source == "127.0.0.1"
|
assert row.source == source
|
||||||
assert row.table_name == table
|
assert row.table_name == table
|
||||||
assert row.username == user
|
assert row.username == user
|
||||||
|
|
||||||
@@ -814,7 +827,7 @@ class CQLAuditTester(AuditTester):
|
|||||||
sorted_new_rows = sorted(new_rows, key=lambda row: (row.node, row.category, row.consistency, row.error, row.keyspace_name, row.operation, row.source, row.table_name, row.username))
|
sorted_new_rows = sorted(new_rows, key=lambda row: (row.node, row.category, row.consistency, row.error, row.keyspace_name, row.operation, row.source, row.table_name, row.username))
|
||||||
assert len(sorted_new_rows) == len(expected_entries)
|
assert len(sorted_new_rows) == len(expected_entries)
|
||||||
for row, entry in zip(sorted_new_rows, sorted(expected_entries)):
|
for row, entry in zip(sorted_new_rows, sorted(expected_entries)):
|
||||||
self.assert_audit_row_eq(row, entry.category, entry.statement, entry.table, entry.ks, entry.user, entry.cl, entry.error)
|
self.assert_audit_row_eq(row, entry.category, entry.statement, entry.table, entry.ks, entry.user, entry.cl, entry.error, entry.source)
|
||||||
|
|
||||||
async def verify_keyspace(self, audit_settings=None, helper=None):
|
async def verify_keyspace(self, audit_settings=None, helper=None):
|
||||||
"""
|
"""
|
||||||
@@ -1854,6 +1867,44 @@ class CQLAuditTester(AuditTester):
|
|||||||
finally:
|
finally:
|
||||||
session.execute("DROP KEYSPACE IF EXISTS kss")
|
session.execute("DROP KEYSPACE IF EXISTS kss")
|
||||||
|
|
||||||
|
# Unix domain sockets have no IP peer address. Seastar's
|
||||||
|
# socket_address::addr() falls through to the default case for
|
||||||
|
# AF_UNIX and returns a zero-initialised in6_addr, i.e. "::".
|
||||||
|
MAINTENANCE_SOCKET_SOURCE = "::"
|
||||||
|
|
||||||
|
async def _test_audit_maintenance_socket_user_creation(self, manager, helper_class):
|
||||||
|
with helper_class() as helper:
|
||||||
|
session = await self.prepare(
|
||||||
|
user="cassandra", password="cassandra",
|
||||||
|
helper=helper,
|
||||||
|
audit_settings={**helper.audit_default_settings, "audit_categories": "DCL", "audit_keyspaces": ""},
|
||||||
|
create_keyspace=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
servers = await manager.running_servers()
|
||||||
|
server = servers[0]
|
||||||
|
socket_path = await manager.server_get_maintenance_socket_path(server.server_id)
|
||||||
|
|
||||||
|
logger.info("Connecting to maintenance socket")
|
||||||
|
endpoint = UnixSocketEndPoint(socket_path)
|
||||||
|
maint_cluster = cluster_con([endpoint],
|
||||||
|
load_balancing_policy=WhiteListRoundRobinPolicy([endpoint]))
|
||||||
|
maint_session = maint_cluster.connect()
|
||||||
|
|
||||||
|
role_name = "audit_test_admin"
|
||||||
|
create_stmt = f"CREATE ROLE {role_name} WITH PASSWORD = 'secret' AND SUPERUSER = true AND LOGIN = true"
|
||||||
|
expected_operation = f"CREATE ROLE {role_name} WITH PASSWORD = '***' AND SUPERUSER = true AND LOGIN = true"
|
||||||
|
|
||||||
|
logger.info("Creating superuser via maintenance socket and verifying audit entry")
|
||||||
|
expected_entries = [AuditEntry(category="DCL", statement=expected_operation,
|
||||||
|
user="anonymous", table="", ks="", cl="LOCAL_QUORUM", error=False,
|
||||||
|
source=self.MAINTENANCE_SOCKET_SOURCE)]
|
||||||
|
with self.assert_entries_were_added(session, expected_entries):
|
||||||
|
maint_session.execute(create_stmt)
|
||||||
|
|
||||||
|
logger.info("Cleaning up created role")
|
||||||
|
maint_session.execute(f"DROP ROLE IF EXISTS {role_name}")
|
||||||
|
safe_driver_shutdown(maint_cluster)
|
||||||
|
|
||||||
# AuditBackendTable, no auth, rf=1
|
# AuditBackendTable, no auth, rf=1
|
||||||
|
|
||||||
@@ -1946,6 +1997,14 @@ async def test_service_level_statements_standalone(manager: ManagerClient):
|
|||||||
await CQLAuditTester(manager)._test_service_level_statements()
|
await CQLAuditTester(manager)._test_service_level_statements()
|
||||||
|
|
||||||
|
|
||||||
|
async def test_audit_maintenance_socket_user_creation(manager: ManagerClient):
|
||||||
|
"""Verify that creating a superuser via the maintenance socket is audited."""
|
||||||
|
t = CQLAuditTester(manager)
|
||||||
|
await t._test_audit_maintenance_socket_user_creation(manager, AuditBackendTable)
|
||||||
|
Syslog = functools.partial(AuditBackendSyslog, socket_path=syslog_socket_path)
|
||||||
|
await t._test_audit_maintenance_socket_user_creation(manager, Syslog)
|
||||||
|
|
||||||
|
|
||||||
# AuditBackendSyslog, no auth, rf=1
|
# AuditBackendSyslog, no auth, rf=1
|
||||||
|
|
||||||
async def test_audit_syslog_noauth(manager: ManagerClient):
|
async def test_audit_syslog_noauth(manager: ManagerClient):
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ from test.pylib.manager_client import ManagerClient
|
|||||||
from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair
|
from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair
|
||||||
from test.pylib.tablets import get_all_tablet_replicas
|
from test.pylib.tablets import get_all_tablet_replicas
|
||||||
from test.cluster.tasks.task_manager_client import TaskManagerClient
|
from test.cluster.tasks.task_manager_client import TaskManagerClient
|
||||||
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown
|
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, ensure_group0_leader_on, new_test_keyspace, new_test_table, trigger_stepdown
|
||||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||||
|
|
||||||
from cassandra.query import ConsistencyLevel, SimpleStatement
|
from cassandra.query import ConsistencyLevel, SimpleStatement
|
||||||
@@ -880,41 +880,30 @@ async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manage
|
|||||||
# affected replica but process the UNREPAIRED sstable on the others, so the classification
|
# affected replica but process the UNREPAIRED sstable on the others, so the classification
|
||||||
# divergence is never corrected. In tombstone scenarios this enables premature tombstone GC
|
# divergence is never corrected. In tombstone scenarios this enables premature tombstone GC
|
||||||
# on the affected replica leading to data resurrection.
|
# on the affected replica leading to data resurrection.
|
||||||
@pytest.mark.asyncio
|
|
||||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
||||||
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
|
|
||||||
cmdline = ['--hinted-handoff-enabled', '0']
|
|
||||||
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
|
|
||||||
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
|
|
||||||
|
|
||||||
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
|
class _LeadershipTransferred(Exception):
|
||||||
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
|
"""Raised when leadership transferred to servers[1] during the test, requiring a retry."""
|
||||||
await cql.run_async(
|
pass
|
||||||
f"ALTER TABLE {ks}.test WITH compaction = "
|
|
||||||
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Disable autocompaction everywhere so we control exactly when compaction runs.
|
async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, token, scylla_path, current_key):
|
||||||
for s in servers:
|
"""Core logic for test_incremental_repair_race_window_promotes_unrepaired_data.
|
||||||
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
|
|
||||||
|
|
||||||
scylla_path = await manager.server_get_exe(servers[0].server_id)
|
|
||||||
|
|
||||||
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
|
|
||||||
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
|
|
||||||
# S0'(repaired_at=1) on all nodes.
|
|
||||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
|
||||||
|
|
||||||
# Insert keys 10-19 and flush on all nodes → S1(repaired_at=0).
|
|
||||||
# These will be the subject of repair 2.
|
|
||||||
repair2_keys = list(range(current_key, current_key + 10))
|
|
||||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
|
|
||||||
for s in servers:
|
|
||||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
|
||||||
current_key += 10
|
|
||||||
|
|
||||||
|
Returns the next current_key value.
|
||||||
|
Raises _LeadershipTransferred if servers[1] becomes coordinator after the
|
||||||
|
restart, signalling the caller to retry.
|
||||||
|
"""
|
||||||
|
# Ensure servers[1] is not the topology coordinator. If the coordinator is
|
||||||
|
# restarted, the Raft leader dies, a new election occurs, and the new
|
||||||
|
# coordinator re-initiates tablet repair -- flushing memtables on all replicas
|
||||||
|
# and marking post-repair data as repaired. That legitimate re-repair masks
|
||||||
|
# the compaction-merge bug this test detects.
|
||||||
coord = await get_topology_coordinator(manager)
|
coord = await get_topology_coordinator(manager)
|
||||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||||
|
if coord_serv == servers[1]:
|
||||||
|
other = next(s for s in servers if s != servers[1])
|
||||||
|
await ensure_group0_leader_on(manager, other)
|
||||||
|
coord = await get_topology_coordinator(manager)
|
||||||
|
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||||
coord_log = await manager.server_open_log(coord_serv.server_id)
|
coord_log = await manager.server_open_log(coord_serv.server_id)
|
||||||
coord_mark = await coord_log.mark()
|
coord_mark = await coord_log.mark()
|
||||||
|
|
||||||
@@ -978,6 +967,16 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
|||||||
await manager.server_start(target.server_id)
|
await manager.server_start(target.server_id)
|
||||||
await manager.servers_see_each_other(servers)
|
await manager.servers_see_each_other(servers)
|
||||||
|
|
||||||
|
# Check if leadership transferred to servers[1] during the restart.
|
||||||
|
# If so, the new coordinator will re-initiate repair, masking the bug.
|
||||||
|
new_coord = await get_topology_coordinator(manager)
|
||||||
|
new_coord_serv = await find_server_by_host_id(manager, servers, new_coord)
|
||||||
|
if new_coord_serv == servers[1]:
|
||||||
|
await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update")
|
||||||
|
await manager.api.wait_task(servers[0].ip_addr, task_id)
|
||||||
|
raise _LeadershipTransferred(
|
||||||
|
"servers[1] became topology coordinator after restart")
|
||||||
|
|
||||||
# Poll until compaction has produced F(repaired_at=2) containing post-repair keys,
|
# Poll until compaction has produced F(repaired_at=2) containing post-repair keys,
|
||||||
# confirming that the bug was triggered (S1' and E merged during the race window).
|
# confirming that the bug was triggered (S1' and E merged during the race window).
|
||||||
deadline = time.time() + 60
|
deadline = time.time() + 60
|
||||||
@@ -1000,7 +999,7 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
|||||||
if not compaction_ran:
|
if not compaction_ran:
|
||||||
logger.warning("Compaction did not merge S1' and E after restart during the race window; "
|
logger.warning("Compaction did not merge S1' and E after restart during the race window; "
|
||||||
"the bug was not triggered. Skipping assertion.")
|
"the bug was not triggered. Skipping assertion.")
|
||||||
return
|
return current_key
|
||||||
|
|
||||||
# Flush servers[0] and servers[2] AFTER the race window closes so their post-repair
|
# Flush servers[0] and servers[2] AFTER the race window closes so their post-repair
|
||||||
# keys land in G(repaired_at=0): correctly classified as UNREPAIRED.
|
# keys land in G(repaired_at=0): correctly classified as UNREPAIRED.
|
||||||
@@ -1031,8 +1030,9 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
|||||||
f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, "
|
f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, "
|
||||||
f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}")
|
f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}")
|
||||||
|
|
||||||
# servers[0] and servers[2] flushed post-repair keys after the race window closed,
|
# servers[0] and servers[2] were never restarted and the coordinator stayed
|
||||||
# so those keys are in G(repaired_at=0) → correctly UNREPAIRED.
|
# alive throughout, so no re-repair could have flushed their memtables.
|
||||||
|
# Post-repair keys must NOT appear in repaired sstables on these servers.
|
||||||
assert not (repaired_keys_0 & post_repair_key_set), \
|
assert not (repaired_keys_0 & post_repair_key_set), \
|
||||||
f"servers[0] should not have post-repair keys in repaired sstables, " \
|
f"servers[0] should not have post-repair keys in repaired sstables, " \
|
||||||
f"got: {repaired_keys_0 & post_repair_key_set}"
|
f"got: {repaired_keys_0 & post_repair_key_set}"
|
||||||
@@ -1053,6 +1053,54 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
|||||||
f"on servers[1] after restart lost the being_repaired markers during the race window. " \
|
f"on servers[1] after restart lost the being_repaired markers during the race window. " \
|
||||||
f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \
|
f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \
|
||||||
f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"
|
f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"
|
||||||
|
return current_key
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||||
|
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
|
||||||
|
cmdline = ['--hinted-handoff-enabled', '0']
|
||||||
|
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
|
||||||
|
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
|
||||||
|
|
||||||
|
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
|
||||||
|
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
|
||||||
|
await cql.run_async(
|
||||||
|
f"ALTER TABLE {ks}.test WITH compaction = "
|
||||||
|
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Disable autocompaction everywhere so we control exactly when compaction runs.
|
||||||
|
for s in servers:
|
||||||
|
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
|
||||||
|
|
||||||
|
scylla_path = await manager.server_get_exe(servers[0].server_id)
|
||||||
|
|
||||||
|
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
|
||||||
|
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
|
||||||
|
# S0'(repaired_at=1) on all nodes.
|
||||||
|
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||||
|
|
||||||
|
# Insert keys 10-19 and flush on all nodes -> S1(repaired_at=0).
|
||||||
|
# These will be the subject of repair 2.
|
||||||
|
repair2_keys = list(range(current_key, current_key + 10))
|
||||||
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
|
||||||
|
for s in servers:
|
||||||
|
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||||
|
current_key += 10
|
||||||
|
|
||||||
|
# If leadership transfers to servers[1] between our coordinator check and the
|
||||||
|
# restart, the coordinator change masks the bug. Detect and retry.
|
||||||
|
max_attempts = 5
|
||||||
|
for attempt in range(1, max_attempts + 1):
|
||||||
|
try:
|
||||||
|
current_key = await _do_race_window_promotes_unrepaired_data(
|
||||||
|
manager, servers, cql, ks, token, scylla_path, current_key)
|
||||||
|
return
|
||||||
|
except _LeadershipTransferred as e:
|
||||||
|
logger.warning(f"Attempt {attempt}/{max_attempts}: {e}. Retrying.")
|
||||||
|
|
||||||
|
pytest.fail(f"Leadership kept transferring to servers[1] after {max_attempts} attempts; "
|
||||||
|
"could not run the test without coordinator interference.")
|
||||||
|
|
||||||
# ----------------------------------------------------------------------------
|
# ----------------------------------------------------------------------------
|
||||||
# Tombstone GC safety tests
|
# Tombstone GC safety tests
|
||||||
|
|||||||
@@ -4,7 +4,7 @@
|
|||||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||||
#
|
#
|
||||||
from test.pylib.manager_client import ManagerClient
|
from test.pylib.manager_client import ManagerClient
|
||||||
from test.cluster.util import get_topology_coordinator, trigger_stepdown
|
from test.cluster.util import get_topology_coordinator, trigger_stepdown, new_test_keyspace, new_test_table
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import logging
|
import logging
|
||||||
@@ -83,3 +83,78 @@ async def test_load_stats_on_coordinator_failover(manager: ManagerClient):
|
|||||||
coord3 = await get_topology_coordinator(manager)
|
coord3 = await get_topology_coordinator(manager)
|
||||||
if coord3:
|
if coord3:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||||
|
async def test_load_stats_refresh_during_shutdown(manager: ManagerClient):
|
||||||
|
"""Verify that _tablet_load_stats_refresh is properly joined during
|
||||||
|
topology coordinator shutdown, even when a schema change notification
|
||||||
|
triggers a refresh between run() completing and stop() being called.
|
||||||
|
|
||||||
|
Reproduces the scenario using two injection points:
|
||||||
|
- topology_coordinator_pause_before_stop: pauses after run() finishes
|
||||||
|
but before stop() is called
|
||||||
|
- refresh_tablet_load_stats_pause: holds refresh_tablet_load_stats()
|
||||||
|
so it's still in-flight during shutdown
|
||||||
|
|
||||||
|
Without the join in stop(), the refresh task outlives the coordinator
|
||||||
|
and accesses freed memory.
|
||||||
|
"""
|
||||||
|
servers = await manager.servers_add(3)
|
||||||
|
await manager.get_ready_cql(servers)
|
||||||
|
|
||||||
|
async with new_test_keyspace(manager,
|
||||||
|
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
||||||
|
coord = await get_topology_coordinator(manager)
|
||||||
|
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
||||||
|
coord_idx = host_ids.index(coord)
|
||||||
|
coord_server = servers[coord_idx]
|
||||||
|
|
||||||
|
log = await manager.server_open_log(coord_server.server_id)
|
||||||
|
mark = await log.mark()
|
||||||
|
|
||||||
|
# Injection B: pause between run() returning and stop() being called.
|
||||||
|
await manager.api.enable_injection(
|
||||||
|
coord_server.ip_addr, "topology_coordinator_pause_before_stop", one_shot=True)
|
||||||
|
|
||||||
|
# Stepdown causes the topology coordinator to abort and shut down.
|
||||||
|
logger.info("Triggering stepdown on coordinator")
|
||||||
|
await trigger_stepdown(manager, coord_server)
|
||||||
|
|
||||||
|
# Wait for injection B to fire. The coordinator has finished run() but
|
||||||
|
# the schema change listener is still registered.
|
||||||
|
mark, _ = await log.wait_for(
|
||||||
|
"topology_coordinator_pause_before_stop: waiting", from_mark=mark)
|
||||||
|
|
||||||
|
# Injection A: block refresh_tablet_load_stats() before it accesses _shared_tm.
|
||||||
|
# Enable it now so it only catches the notification-triggered call.
|
||||||
|
await manager.api.enable_injection(
|
||||||
|
coord_server.ip_addr, "refresh_tablet_load_stats_pause", one_shot=True)
|
||||||
|
|
||||||
|
# CREATE TABLE fires on_create_column_family on the old coordinator which
|
||||||
|
# fire-and-forgets _tablet_load_stats_refresh.trigger() scheduling a task
|
||||||
|
# via with_scheduling_group on the gossip scheduling group.
|
||||||
|
logger.info("Issuing CREATE TABLE while coordinator is paused before stop()")
|
||||||
|
async with new_test_table(manager, ks, "pk int PRIMARY KEY", reuse_tables=False):
|
||||||
|
# Wait for injection A: refresh_tablet_load_stats() is now blocked before
|
||||||
|
# accessing _shared_tm. The topology_coordinator is still alive (paused at B).
|
||||||
|
await log.wait_for("refresh_tablet_load_stats_pause: waiting", from_mark=mark)
|
||||||
|
|
||||||
|
# Release injection B: coordinator proceeds through stop().
|
||||||
|
# Without the fix, stop() returns quickly and run_topology_coordinator
|
||||||
|
# frees the topology_coordinator frame. With the fix, stop() blocks at
|
||||||
|
# _tablet_load_stats_refresh.join() until injection A is released.
|
||||||
|
logger.info("Releasing injection B: coordinator will stop")
|
||||||
|
await manager.api.message_injection(
|
||||||
|
coord_server.ip_addr, "topology_coordinator_pause_before_stop")
|
||||||
|
|
||||||
|
# Release injection A: refresh_tablet_load_stats() resumes and accesses
|
||||||
|
# this->_shared_tm via get_token_metadata_ptr(). Without the fix, 'this'
|
||||||
|
# points to freed memory and ASan detects heap-use-after-free.
|
||||||
|
logger.info("Releasing injection A: refresh resumes")
|
||||||
|
await manager.api.message_injection(
|
||||||
|
coord_server.ip_addr, "refresh_tablet_load_stats_pause")
|
||||||
|
|
||||||
|
# If the bug is present, the node crashed. read_barrier will fail.
|
||||||
|
await read_barrier(manager.api, coord_server.ip_addr)
|
||||||
|
|||||||
@@ -435,8 +435,9 @@ async def test_alter_tablets_rf_dc_drop(request: pytest.FixtureRequest, manager:
|
|||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||||
async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
||||||
async def get_replication_options(ks: str):
|
async def get_replication_options(ks: str, host, ip_addr):
|
||||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
await read_barrier(manager.api, ip_addr)
|
||||||
|
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
|
||||||
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
||||||
return repl
|
return repl
|
||||||
|
|
||||||
@@ -451,43 +452,44 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
|
|||||||
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
||||||
|
|
||||||
cql = manager.get_cql()
|
cql = manager.get_cql()
|
||||||
|
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
|
||||||
|
|
||||||
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
||||||
await cql.run_async("create table ks1.t (pk int primary key);")
|
await cql.run_async("create table ks1.t (pk int primary key);")
|
||||||
repl = await get_replication_options("ks1")
|
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||||
assert repl['dc1'] == '1'
|
assert repl['dc1'] == '1'
|
||||||
|
|
||||||
await cql.run_async("create keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 2} and tablets = {'initial': 4};")
|
await cql.run_async("create keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 2} and tablets = {'initial': 4};")
|
||||||
await cql.run_async("create table ks2.t (pk int primary key);")
|
await cql.run_async("create table ks2.t (pk int primary key);")
|
||||||
repl = await get_replication_options("ks2")
|
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
|
||||||
assert repl['dc1'] == '1'
|
assert repl['dc1'] == '1'
|
||||||
assert repl['dc2'] == '2'
|
assert repl['dc2'] == '2'
|
||||||
|
|
||||||
await cql.run_async("create keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
|
await cql.run_async("create keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
|
||||||
await cql.run_async("create table ks3.t (pk int primary key);")
|
await cql.run_async("create table ks3.t (pk int primary key);")
|
||||||
repl = await get_replication_options("ks3")
|
repl = await get_replication_options("ks3", host, servers[0].ip_addr)
|
||||||
assert repl['dc1'] == '1'
|
assert repl['dc1'] == '1'
|
||||||
|
|
||||||
await cql.run_async("create keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
|
await cql.run_async("create keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
|
||||||
await cql.run_async("create table ks4.t (pk int primary key);")
|
await cql.run_async("create table ks4.t (pk int primary key);")
|
||||||
repl = await get_replication_options("ks4")
|
repl = await get_replication_options("ks4", host, servers[0].ip_addr)
|
||||||
assert repl['dc1'] == '1'
|
assert repl['dc1'] == '1'
|
||||||
|
|
||||||
await cql.run_async(f"create keyspace ks5 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}} and tablets = {{'initial': 4}};")
|
await cql.run_async(f"create keyspace ks5 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}} and tablets = {{'initial': 4}};")
|
||||||
await cql.run_async("create table ks5.t (pk int primary key);")
|
await cql.run_async("create table ks5.t (pk int primary key);")
|
||||||
repl = await get_replication_options("ks5")
|
repl = await get_replication_options("ks5", host, servers[0].ip_addr)
|
||||||
assert repl['dc1'] == '2'
|
assert repl['dc1'] == '2'
|
||||||
assert repl['dc2'] == '2'
|
assert repl['dc2'] == '2'
|
||||||
|
|
||||||
await cql.run_async(f"create keyspace ks6 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
|
await cql.run_async(f"create keyspace ks6 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
|
||||||
await cql.run_async("create table ks6.t (pk int primary key);")
|
await cql.run_async("create table ks6.t (pk int primary key);")
|
||||||
repl = await get_replication_options("ks6")
|
repl = await get_replication_options("ks6", host, servers[0].ip_addr)
|
||||||
assert repl['dc1'] == '2'
|
assert repl['dc1'] == '2'
|
||||||
|
|
||||||
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
|
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
|
||||||
|
|
||||||
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
|
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
|
||||||
repl = await get_replication_options("ks1")
|
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||||
assert repl['dc1'] == ['rack1b']
|
assert repl['dc1'] == ['rack1b']
|
||||||
|
|
||||||
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks1", "t")
|
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks1", "t")
|
||||||
@@ -497,7 +499,7 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
|
|||||||
assert r.replicas[0][0] == host_ids[1]
|
assert r.replicas[0][0] == host_ids[1]
|
||||||
|
|
||||||
await cql.run_async("alter keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a', 'rack2b']};")
|
await cql.run_async("alter keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a', 'rack2b']};")
|
||||||
repl = await get_replication_options("ks2")
|
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
|
||||||
assert repl['dc1'] == ['rack1a']
|
assert repl['dc1'] == ['rack1a']
|
||||||
assert len(repl['dc2']) == 2
|
assert len(repl['dc2']) == 2
|
||||||
assert 'rack2a' in repl['dc2'] and 'rack2b' in repl['dc2']
|
assert 'rack2a' in repl['dc2'] and 'rack2b' in repl['dc2']
|
||||||
@@ -523,13 +525,13 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
await cql.run_async("alter keyspace ks5 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b'], 'dc2' : 2};")
|
await cql.run_async("alter keyspace ks5 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b'], 'dc2' : 2};")
|
||||||
repl = await get_replication_options("ks5")
|
repl = await get_replication_options("ks5", host, servers[0].ip_addr)
|
||||||
assert len(repl['dc1']) == 2
|
assert len(repl['dc1']) == 2
|
||||||
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
|
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
|
||||||
assert repl['dc2'] == '2'
|
assert repl['dc2'] == '2'
|
||||||
|
|
||||||
await cql.run_async("alter keyspace ks6 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : 2, 'dc2' : ['rack2a']};")
|
await cql.run_async("alter keyspace ks6 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : 2, 'dc2' : ['rack2a']};")
|
||||||
repl = await get_replication_options("ks6")
|
repl = await get_replication_options("ks6", host, servers[0].ip_addr)
|
||||||
assert repl['dc1'] == '2'
|
assert repl['dc1'] == '2'
|
||||||
assert len(repl['dc2']) == 1
|
assert len(repl['dc2']) == 1
|
||||||
assert repl['dc2'][0] == 'rack2a'
|
assert repl['dc2'][0] == 'rack2a'
|
||||||
@@ -537,8 +539,9 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
|
|||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||||
async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
||||||
async def get_replication_options(ks: str):
|
async def get_replication_options(ks: str, host, ip_addr):
|
||||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
await read_barrier(manager.api, ip_addr)
|
||||||
|
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
|
||||||
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
||||||
return repl
|
return repl
|
||||||
|
|
||||||
@@ -551,10 +554,11 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
|
|||||||
await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc2', 'rack': 'rack2b'})]
|
await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc2', 'rack': 'rack2b'})]
|
||||||
|
|
||||||
cql = manager.get_cql()
|
cql = manager.get_cql()
|
||||||
|
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
|
||||||
|
|
||||||
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
||||||
await cql.run_async("create table ks1.t (pk int primary key);")
|
await cql.run_async("create table ks1.t (pk int primary key);")
|
||||||
repl = await get_replication_options("ks1")
|
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||||
assert repl['dc1'] == '1'
|
assert repl['dc1'] == '1'
|
||||||
|
|
||||||
await cql.run_async("CREATE KEYSPACE ksv WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2} AND tablets = {'enabled': false}")
|
await cql.run_async("CREATE KEYSPACE ksv WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2} AND tablets = {'enabled': false}")
|
||||||
@@ -574,19 +578,19 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
|
|||||||
servers = servers[0:-1]
|
servers = servers[0:-1]
|
||||||
|
|
||||||
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
|
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
|
||||||
repl = await get_replication_options("ks1")
|
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||||
assert repl['dc1'] == ['rack1b']
|
assert repl['dc1'] == ['rack1b']
|
||||||
|
|
||||||
logging.info("Rolling restart")
|
logging.info("Rolling restart")
|
||||||
await manager.rolling_restart(servers, wait_for_cql=True, cmdline_options_override=["--enforce-rack-list", "true", "--error-injections-at-startup", "[]", "--smp", "2"])
|
await manager.rolling_restart(servers, wait_for_cql=True, cmdline_options_override=["--enforce-rack-list", "true", "--error-injections-at-startup", "[]", "--smp", "2"])
|
||||||
|
|
||||||
await cql.run_async(f"create keyspace ks2 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
|
await cql.run_async(f"create keyspace ks2 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
|
||||||
repl = await get_replication_options("ks2")
|
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
|
||||||
assert len(repl['dc1']) == 2
|
assert len(repl['dc1']) == 2
|
||||||
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
|
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
|
||||||
|
|
||||||
await cql.run_async(f"create keyspace ks3 with replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': 4}};")
|
await cql.run_async(f"create keyspace ks3 with replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': 4}};")
|
||||||
repl = await get_replication_options("ks3")
|
repl = await get_replication_options("ks3", host, servers[0].ip_addr)
|
||||||
assert len(repl['dc1']) == 1
|
assert len(repl['dc1']) == 1
|
||||||
assert len(repl['dc2']) == 1
|
assert len(repl['dc2']) == 1
|
||||||
assert 'rack1a' in repl['dc1'] or 'rack1b' in repl['dc1']
|
assert 'rack1a' in repl['dc1'] or 'rack1b' in repl['dc1']
|
||||||
@@ -602,7 +606,7 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
|
|||||||
assert failed
|
assert failed
|
||||||
|
|
||||||
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b'], 'dc2': 1};")
|
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b'], 'dc2': 1};")
|
||||||
repl = await get_replication_options("ks1")
|
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||||
assert len(repl['dc1']) == 1
|
assert len(repl['dc1']) == 1
|
||||||
assert repl['dc1'][0] == 'rack1b'
|
assert repl['dc1'][0] == 'rack1b'
|
||||||
assert len(repl['dc2']) == 1
|
assert len(repl['dc2']) == 1
|
||||||
@@ -1109,8 +1113,9 @@ async def test_multi_rf_increase_before_decrease_0_N(request: pytest.FixtureRequ
|
|||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||||
async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
||||||
async def get_replication_options(ks: str):
|
async def get_replication_options(ks: str, host, ip_addr):
|
||||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
await read_barrier(manager.api, ip_addr)
|
||||||
|
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
|
||||||
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
||||||
return repl
|
return repl
|
||||||
|
|
||||||
@@ -1128,10 +1133,11 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
|
|||||||
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
||||||
|
|
||||||
cql = manager.get_cql()
|
cql = manager.get_cql()
|
||||||
|
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
|
||||||
|
|
||||||
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
||||||
await cql.run_async("create table ks1.t (pk int primary key);")
|
await cql.run_async("create table ks1.t (pk int primary key);")
|
||||||
repl = await get_replication_options("ks1")
|
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||||
assert repl['dc1'] == '1'
|
assert repl['dc1'] == '1'
|
||||||
|
|
||||||
[await manager.api.disable_injection(s.ip_addr, numeric_injection) for s in servers]
|
[await manager.api.disable_injection(s.ip_addr, numeric_injection) for s in servers]
|
||||||
@@ -1165,7 +1171,7 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
|
|||||||
failed = True
|
failed = True
|
||||||
assert failed
|
assert failed
|
||||||
|
|
||||||
repl = await get_replication_options("ks1")
|
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||||
assert repl['dc1'] == '1'
|
assert repl['dc1'] == '1'
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
#include <seastar/testing/test_case.hh>
|
#include <seastar/testing/test_case.hh>
|
||||||
|
|
||||||
#include "test/lib/exception_utils.hh"
|
#include "test/lib/exception_utils.hh"
|
||||||
|
#include "test/lib/log.hh"
|
||||||
#include "test/lib/test_utils.hh"
|
#include "test/lib/test_utils.hh"
|
||||||
#include "ldap_common.hh"
|
#include "ldap_common.hh"
|
||||||
#include "service/migration_manager.hh"
|
#include "service/migration_manager.hh"
|
||||||
@@ -681,3 +682,41 @@ SEASTAR_TEST_CASE(ldap_config) {
|
|||||||
},
|
},
|
||||||
make_ldap_config());
|
make_ldap_config());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reproduces the race between the cache pruner and the permission
|
||||||
|
// loader lifecycle during shutdown. Refs SCYLLADB-1679.
|
||||||
|
SEASTAR_TEST_CASE(ldap_pruner_no_crash_after_loader_cleared) {
|
||||||
|
auto cfg = make_ldap_config();
|
||||||
|
cfg->permissions_update_interval_in_ms.set(1);
|
||||||
|
|
||||||
|
auto call_count = seastar::make_lw_shared<int>(0);
|
||||||
|
|
||||||
|
co_await do_with_cql_env_thread([call_count](cql_test_env& env) {
|
||||||
|
auto& cache = env.auth_cache().local();
|
||||||
|
|
||||||
|
testlog.info("Populating 50 cache entries");
|
||||||
|
for (int i = 0; i < 50; i++) {
|
||||||
|
auto r = auth::make_data_resource("system", fmt::format("t{}", i));
|
||||||
|
cache.get_permissions(auth::role_or_anonymous(), r).get();
|
||||||
|
}
|
||||||
|
|
||||||
|
testlog.info("Installing slow permission loader (10ms per call)");
|
||||||
|
cache.set_permission_loader(
|
||||||
|
[call_count] (const auth::role_or_anonymous&, const auth::resource&)
|
||||||
|
-> seastar::future<auth::permission_set> {
|
||||||
|
++(*call_count);
|
||||||
|
co_await seastar::sleep(std::chrono::milliseconds(10));
|
||||||
|
co_return auth::permission_set();
|
||||||
|
});
|
||||||
|
|
||||||
|
testlog.info("Waiting for pruner to start reloading");
|
||||||
|
while (*call_count == 0) {
|
||||||
|
seastar::sleep(std::chrono::milliseconds(1)).get();
|
||||||
|
}
|
||||||
|
|
||||||
|
testlog.info("Pruner started, letting teardown run");
|
||||||
|
}, cfg);
|
||||||
|
|
||||||
|
testlog.info("Loader called {} times", *call_count);
|
||||||
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -371,9 +371,13 @@ int scylla_simple_query_main(int argc, char** argv) {
|
|||||||
audit::audit::start_audit(env.local_db().get_config(), env.get_shared_token_metadata(), env.qp(), env.migration_manager()).handle_exception([&] (auto&& e) {
|
audit::audit::start_audit(env.local_db().get_config(), env.get_shared_token_metadata(), env.qp(), env.migration_manager()).handle_exception([&] (auto&& e) {
|
||||||
fmt::print("audit start failed: {}", e);
|
fmt::print("audit start failed: {}", e);
|
||||||
}).get();
|
}).get();
|
||||||
|
audit::audit::start_storage(env.local_db().get_config()).get();
|
||||||
auto audit_stop = defer([] {
|
auto audit_stop = defer([] {
|
||||||
audit::audit::stop_audit().get();
|
audit::audit::stop_audit().get();
|
||||||
});
|
});
|
||||||
|
auto audit_storage_stop = defer([] {
|
||||||
|
audit::audit::stop_storage().get();
|
||||||
|
});
|
||||||
auto results = do_cql_test(env, cfg);
|
auto results = do_cql_test(env, cfg);
|
||||||
aggregated_perf_results agg(results);
|
aggregated_perf_results agg(results);
|
||||||
std::cout << agg << std::endl;
|
std::cout << agg << std::endl;
|
||||||
|
|||||||
@@ -126,6 +126,9 @@ class CppFile(pytest.File, ABC):
|
|||||||
return args
|
return args
|
||||||
|
|
||||||
def collect(self) -> Iterator[CppTestCase]:
|
def collect(self) -> Iterator[CppTestCase]:
|
||||||
|
if BUILD_MODE not in self.stash:
|
||||||
|
return
|
||||||
|
|
||||||
custom_args = self.suite_config.get("custom_args", {}).get(self.test_name, DEFAULT_CUSTOM_ARGS)
|
custom_args = self.suite_config.get("custom_args", {}).get(self.test_name, DEFAULT_CUSTOM_ARGS)
|
||||||
|
|
||||||
for test_case in self.list_test_cases():
|
for test_case in self.list_test_cases():
|
||||||
|
|||||||
@@ -163,6 +163,11 @@ def scylla_binary(testpy_test) -> Path:
|
|||||||
|
|
||||||
|
|
||||||
def pytest_collection_modifyitems(items: list[pytest.Item]) -> None:
|
def pytest_collection_modifyitems(items: list[pytest.Item]) -> None:
|
||||||
|
items[:] = [
|
||||||
|
item for item in items
|
||||||
|
if (parent_file := item.getparent(cls=pytest.File)) is not None
|
||||||
|
and BUILD_MODE in parent_file.stash
|
||||||
|
]
|
||||||
for item in items:
|
for item in items:
|
||||||
modify_pytest_item(item=item)
|
modify_pytest_item(item=item)
|
||||||
|
|
||||||
@@ -285,7 +290,10 @@ def pytest_configure(config: pytest.Config) -> None:
|
|||||||
pytest_log_dir.mkdir(parents=True, exist_ok=True)
|
pytest_log_dir.mkdir(parents=True, exist_ok=True)
|
||||||
if not _pytest_config.getoption("--save-log-on-success"):
|
if not _pytest_config.getoption("--save-log-on-success"):
|
||||||
for file in pytest_log_dir.glob("*"):
|
for file in pytest_log_dir.glob("*"):
|
||||||
file.unlink()
|
# This will help in case framework tests are executed with test.py event if it's the wrong way to run them.
|
||||||
|
# test_no_bare_skip_markers_in_collection uses a subprocess to run a collection that has lead to race
|
||||||
|
# condition, especially with repeat.
|
||||||
|
file.unlink(missing_ok=True)
|
||||||
|
|
||||||
_pytest_config.stash[PYTEST_LOG_FILE] = f"{pytest_log_dir}/pytest_main_{HOST_ID}.log"
|
_pytest_config.stash[PYTEST_LOG_FILE] = f"{pytest_log_dir}/pytest_main_{HOST_ID}.log"
|
||||||
|
|
||||||
@@ -340,7 +348,8 @@ def pytest_collect_file(file_path: pathlib.Path,
|
|||||||
repeats = list(product(build_modes, parent.config.run_ids))
|
repeats = list(product(build_modes, parent.config.run_ids))
|
||||||
|
|
||||||
if not repeats:
|
if not repeats:
|
||||||
return []
|
parent.stash[REPEATING_FILES].remove(file_path)
|
||||||
|
return collectors
|
||||||
|
|
||||||
ihook = parent.ihook
|
ihook = parent.ihook
|
||||||
collectors = list(chain(collectors, chain.from_iterable(
|
collectors = list(chain(collectors, chain.from_iterable(
|
||||||
|
|||||||
@@ -75,6 +75,7 @@ def test_no_bare_skip_markers_in_collection():
|
|||||||
"--collect-only",
|
"--collect-only",
|
||||||
"--ignore=boost", "--ignore=raft",
|
"--ignore=boost", "--ignore=raft",
|
||||||
"--ignore=ldap", "--ignore=vector_search",
|
"--ignore=ldap", "--ignore=vector_search",
|
||||||
|
"--ignore=unit",
|
||||||
"-p", "no:sugar"],
|
"-p", "no:sugar"],
|
||||||
capture_output=True, text=True,
|
capture_output=True, text=True,
|
||||||
cwd=str(_TEST_ROOT),
|
cwd=str(_TEST_ROOT),
|
||||||
|
|||||||
Reference in New Issue
Block a user