Compare commits

..

1 Commits

Author SHA1 Message Date
Anna Mikhlin
5231c77e8e Update ScyllaDB version to: 2026.2.0-rc0 2026-04-26 15:28:16 +03:00
11 changed files with 81 additions and 274 deletions

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=2026.3.0-dev
VERSION=2026.2.0-rc0
if test -f version
then

View File

@@ -258,11 +258,13 @@ future<> ldap_role_manager::start() {
} catch (const seastar::sleep_aborted&) {
co_return; // ignore
}
try {
co_await _cache.reload_all_permissions();
} catch (...) {
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
}
co_await _cache.container().invoke_on_all([] (cache& c) -> future<> {
try {
co_await c.reload_all_permissions();
} catch (...) {
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
}
});
}
});
return _std_mgr.start();

View File

@@ -157,20 +157,6 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
return create_legacy_keyspace_if_missing(mm);
});
}
// Authorizer must be started before the permission loader is set,
// because the loader calls _authorizer->authorize().
// The loader must be set before starting the role manager, because
// LDAP role manager starts a pruner fiber that calls
// reload_all_permissions() which asserts _permission_loader is set.
co_await _authorizer->start();
if (!_used_by_maintenance_socket) {
// Maintenance socket mode can't cache permissions because it has
// different authorizer. We can't mix cached permissions, they could be
// different in normal mode.
_cache.set_permission_loader(std::bind(
&service::get_uncached_permissions,
this, std::placeholders::_1, std::placeholders::_2));
}
co_await _role_manager->start();
if (this_shard_id() == 0) {
// Role manager and password authenticator have this odd startup
@@ -179,19 +165,21 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
// creation therefore we need to wait here.
co_await _role_manager->ensure_superuser_is_created();
}
// Authenticator must be started after ensure_superuser_is_created()
// because password_authenticator queries system.roles for the
// superuser entry created by the role manager.
co_await _authenticator->start();
co_await when_all_succeed(_authorizer->start(), _authenticator->start()).discard_result();
if (!_used_by_maintenance_socket) {
// Maintenance socket mode can't cache permissions because it has
// different authorizer. We can't mix cached permissions, they could be
// different in normal mode.
_cache.set_permission_loader(std::bind(
&service::get_uncached_permissions,
this, std::placeholders::_1, std::placeholders::_2));
}
}
future<> service::stop() {
_as.request_abort();
// Reverse of start() order.
co_await _authenticator->stop();
co_await _role_manager->stop();
_cache.set_permission_loader(nullptr);
co_await _authorizer->stop();
return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop()).discard_result();
}
future<> service::ensure_superuser_is_created() {

View File

@@ -4237,7 +4237,6 @@ public:
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
, _async_gate("topology_coordinator")
{
_lifecycle_notifier.register_subscriber(this);
_db.get_notifier().register_listener(this);
// When the delay_cdc_stream_finalization error injection is disabled
// (test releases it), wake the topology coordinator so it retries
@@ -4401,7 +4400,6 @@ future<bool> topology_coordinator::maybe_retry_failed_rf_change_tablet_rebuilds(
}
future<> topology_coordinator::refresh_tablet_load_stats() {
co_await utils::get_local_injector().inject("refresh_tablet_load_stats_pause", utils::wait_for_message(5min));
auto tm = get_token_metadata_ptr();
locator::load_stats stats;
@@ -4725,6 +4723,7 @@ future<> topology_coordinator::run() {
co_await _async_gate.close();
co_await std::move(tablet_load_stats_refresher);
co_await _tablet_load_stats_refresh.join();
co_await std::move(cdc_generation_publisher);
co_await std::move(cdc_streams_gc);
co_await std::move(gossiper_orphan_remover);
@@ -4737,8 +4736,6 @@ future<> topology_coordinator::stop() {
co_await _db.get_notifier().unregister_listener(this);
utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization");
_topo_sm.on_tablet_split_ready = nullptr;
co_await _lifecycle_notifier.unregister_subscriber(this);
co_await _tablet_load_stats_refresh.join();
// if topology_coordinator::run() is aborted either because we are not a
// leader anymore, or we are shutting down as a leader, we have to handle
@@ -4800,6 +4797,7 @@ future<> run_topology_coordinator(
topology_cmd_rpc_tracker};
std::exception_ptr ex;
lifecycle_notifier.register_subscriber(&coordinator);
try {
rtlogger.info("start topology coordinator fiber");
co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
@@ -4820,7 +4818,7 @@ future<> run_topology_coordinator(
}
on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex));
}
co_await utils::get_local_injector().inject("topology_coordinator_pause_before_stop", utils::wait_for_message(5min));
co_await lifecycle_notifier.unregister_subscriber(&coordinator);
co_await coordinator.stop();
}

View File

@@ -8,7 +8,7 @@ from test.pylib.manager_client import ManagerClient
from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair
from test.pylib.tablets import get_all_tablet_replicas
from test.cluster.tasks.task_manager_client import TaskManagerClient
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, ensure_group0_leader_on, new_test_keyspace, new_test_table, trigger_stepdown
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown
from test.pylib.util import wait_for_cql_and_get_hosts
from cassandra.query import ConsistencyLevel, SimpleStatement
@@ -880,30 +880,41 @@ async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manage
# affected replica but process the UNREPAIRED sstable on the others, so the classification
# divergence is never corrected. In tombstone scenarios this enables premature tombstone GC
# on the affected replica leading to data resurrection.
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
cmdline = ['--hinted-handoff-enabled', '0']
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
class _LeadershipTransferred(Exception):
"""Raised when leadership transferred to servers[1] during the test, requiring a retry."""
pass
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
await cql.run_async(
f"ALTER TABLE {ks}.test WITH compaction = "
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
)
async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, token, scylla_path, current_key):
"""Core logic for test_incremental_repair_race_window_promotes_unrepaired_data.
# Disable autocompaction everywhere so we control exactly when compaction runs.
for s in servers:
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
scylla_path = await manager.server_get_exe(servers[0].server_id)
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
# S0'(repaired_at=1) on all nodes.
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# Insert keys 10-19 and flush on all nodes → S1(repaired_at=0).
# These will be the subject of repair 2.
repair2_keys = list(range(current_key, current_key + 10))
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
for s in servers:
await manager.api.flush_keyspace(s.ip_addr, ks)
current_key += 10
Returns the next current_key value.
Raises _LeadershipTransferred if servers[1] becomes coordinator after the
restart, signalling the caller to retry.
"""
# Ensure servers[1] is not the topology coordinator. If the coordinator is
# restarted, the Raft leader dies, a new election occurs, and the new
# coordinator re-initiates tablet repair -- flushing memtables on all replicas
# and marking post-repair data as repaired. That legitimate re-repair masks
# the compaction-merge bug this test detects.
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)
if coord_serv == servers[1]:
other = next(s for s in servers if s != servers[1])
await ensure_group0_leader_on(manager, other)
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)
coord_log = await manager.server_open_log(coord_serv.server_id)
coord_mark = await coord_log.mark()
@@ -967,16 +978,6 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
await manager.server_start(target.server_id)
await manager.servers_see_each_other(servers)
# Check if leadership transferred to servers[1] during the restart.
# If so, the new coordinator will re-initiate repair, masking the bug.
new_coord = await get_topology_coordinator(manager)
new_coord_serv = await find_server_by_host_id(manager, servers, new_coord)
if new_coord_serv == servers[1]:
await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update")
await manager.api.wait_task(servers[0].ip_addr, task_id)
raise _LeadershipTransferred(
"servers[1] became topology coordinator after restart")
# Poll until compaction has produced F(repaired_at=2) containing post-repair keys,
# confirming that the bug was triggered (S1' and E merged during the race window).
deadline = time.time() + 60
@@ -999,7 +1000,7 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
if not compaction_ran:
logger.warning("Compaction did not merge S1' and E after restart during the race window; "
"the bug was not triggered. Skipping assertion.")
return current_key
return
# Flush servers[0] and servers[2] AFTER the race window closes so their post-repair
# keys land in G(repaired_at=0): correctly classified as UNREPAIRED.
@@ -1030,9 +1031,8 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, "
f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}")
# servers[0] and servers[2] were never restarted and the coordinator stayed
# alive throughout, so no re-repair could have flushed their memtables.
# Post-repair keys must NOT appear in repaired sstables on these servers.
# servers[0] and servers[2] flushed post-repair keys after the race window closed,
# so those keys are in G(repaired_at=0) → correctly UNREPAIRED.
assert not (repaired_keys_0 & post_repair_key_set), \
f"servers[0] should not have post-repair keys in repaired sstables, " \
f"got: {repaired_keys_0 & post_repair_key_set}"
@@ -1053,54 +1053,6 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
f"on servers[1] after restart lost the being_repaired markers during the race window. " \
f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \
f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"
return current_key
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
cmdline = ['--hinted-handoff-enabled', '0']
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
await cql.run_async(
f"ALTER TABLE {ks}.test WITH compaction = "
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
)
# Disable autocompaction everywhere so we control exactly when compaction runs.
for s in servers:
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
scylla_path = await manager.server_get_exe(servers[0].server_id)
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
# S0'(repaired_at=1) on all nodes.
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# Insert keys 10-19 and flush on all nodes -> S1(repaired_at=0).
# These will be the subject of repair 2.
repair2_keys = list(range(current_key, current_key + 10))
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
for s in servers:
await manager.api.flush_keyspace(s.ip_addr, ks)
current_key += 10
# If leadership transfers to servers[1] between our coordinator check and the
# restart, the coordinator change masks the bug. Detect and retry.
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
current_key = await _do_race_window_promotes_unrepaired_data(
manager, servers, cql, ks, token, scylla_path, current_key)
return
except _LeadershipTransferred as e:
logger.warning(f"Attempt {attempt}/{max_attempts}: {e}. Retrying.")
pytest.fail(f"Leadership kept transferring to servers[1] after {max_attempts} attempts; "
"could not run the test without coordinator interference.")
# ----------------------------------------------------------------------------
# Tombstone GC safety tests

View File

@@ -4,7 +4,7 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
#
from test.pylib.manager_client import ManagerClient
from test.cluster.util import get_topology_coordinator, trigger_stepdown, new_test_keyspace, new_test_table
from test.cluster.util import get_topology_coordinator, trigger_stepdown
import pytest
import logging
@@ -83,78 +83,3 @@ async def test_load_stats_on_coordinator_failover(manager: ManagerClient):
coord3 = await get_topology_coordinator(manager)
if coord3:
break
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_load_stats_refresh_during_shutdown(manager: ManagerClient):
"""Verify that _tablet_load_stats_refresh is properly joined during
topology coordinator shutdown, even when a schema change notification
triggers a refresh between run() completing and stop() being called.
Reproduces the scenario using two injection points:
- topology_coordinator_pause_before_stop: pauses after run() finishes
but before stop() is called
- refresh_tablet_load_stats_pause: holds refresh_tablet_load_stats()
so it's still in-flight during shutdown
Without the join in stop(), the refresh task outlives the coordinator
and accesses freed memory.
"""
servers = await manager.servers_add(3)
await manager.get_ready_cql(servers)
async with new_test_keyspace(manager,
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
coord = await get_topology_coordinator(manager)
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
coord_idx = host_ids.index(coord)
coord_server = servers[coord_idx]
log = await manager.server_open_log(coord_server.server_id)
mark = await log.mark()
# Injection B: pause between run() returning and stop() being called.
await manager.api.enable_injection(
coord_server.ip_addr, "topology_coordinator_pause_before_stop", one_shot=True)
# Stepdown causes the topology coordinator to abort and shut down.
logger.info("Triggering stepdown on coordinator")
await trigger_stepdown(manager, coord_server)
# Wait for injection B to fire. The coordinator has finished run() but
# the schema change listener is still registered.
mark, _ = await log.wait_for(
"topology_coordinator_pause_before_stop: waiting", from_mark=mark)
# Injection A: block refresh_tablet_load_stats() before it accesses _shared_tm.
# Enable it now so it only catches the notification-triggered call.
await manager.api.enable_injection(
coord_server.ip_addr, "refresh_tablet_load_stats_pause", one_shot=True)
# CREATE TABLE fires on_create_column_family on the old coordinator which
# fire-and-forgets _tablet_load_stats_refresh.trigger() scheduling a task
# via with_scheduling_group on the gossip scheduling group.
logger.info("Issuing CREATE TABLE while coordinator is paused before stop()")
async with new_test_table(manager, ks, "pk int PRIMARY KEY", reuse_tables=False):
# Wait for injection A: refresh_tablet_load_stats() is now blocked before
# accessing _shared_tm. The topology_coordinator is still alive (paused at B).
await log.wait_for("refresh_tablet_load_stats_pause: waiting", from_mark=mark)
# Release injection B: coordinator proceeds through stop().
# Without the fix, stop() returns quickly and run_topology_coordinator
# frees the topology_coordinator frame. With the fix, stop() blocks at
# _tablet_load_stats_refresh.join() until injection A is released.
logger.info("Releasing injection B: coordinator will stop")
await manager.api.message_injection(
coord_server.ip_addr, "topology_coordinator_pause_before_stop")
# Release injection A: refresh_tablet_load_stats() resumes and accesses
# this->_shared_tm via get_token_metadata_ptr(). Without the fix, 'this'
# points to freed memory and ASan detects heap-use-after-free.
logger.info("Releasing injection A: refresh resumes")
await manager.api.message_injection(
coord_server.ip_addr, "refresh_tablet_load_stats_pause")
# If the bug is present, the node crashed. read_barrier will fail.
await read_barrier(manager.api, coord_server.ip_addr)

View File

@@ -435,9 +435,8 @@ async def test_alter_tablets_rf_dc_drop(request: pytest.FixtureRequest, manager:
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str, host, ip_addr):
await read_barrier(manager.api, ip_addr)
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
async def get_replication_options(ks: str):
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl
@@ -452,44 +451,43 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
cql = manager.get_cql()
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
repl = await get_replication_options("ks1")
assert repl['dc1'] == '1'
await cql.run_async("create keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 2} and tablets = {'initial': 4};")
await cql.run_async("create table ks2.t (pk int primary key);")
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
repl = await get_replication_options("ks2")
assert repl['dc1'] == '1'
assert repl['dc2'] == '2'
await cql.run_async("create keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
await cql.run_async("create table ks3.t (pk int primary key);")
repl = await get_replication_options("ks3", host, servers[0].ip_addr)
repl = await get_replication_options("ks3")
assert repl['dc1'] == '1'
await cql.run_async("create keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
await cql.run_async("create table ks4.t (pk int primary key);")
repl = await get_replication_options("ks4", host, servers[0].ip_addr)
repl = await get_replication_options("ks4")
assert repl['dc1'] == '1'
await cql.run_async(f"create keyspace ks5 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks5.t (pk int primary key);")
repl = await get_replication_options("ks5", host, servers[0].ip_addr)
repl = await get_replication_options("ks5")
assert repl['dc1'] == '2'
assert repl['dc2'] == '2'
await cql.run_async(f"create keyspace ks6 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks6.t (pk int primary key);")
repl = await get_replication_options("ks6", host, servers[0].ip_addr)
repl = await get_replication_options("ks6")
assert repl['dc1'] == '2'
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
repl = await get_replication_options("ks1")
assert repl['dc1'] == ['rack1b']
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks1", "t")
@@ -499,7 +497,7 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
assert r.replicas[0][0] == host_ids[1]
await cql.run_async("alter keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a', 'rack2b']};")
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
repl = await get_replication_options("ks2")
assert repl['dc1'] == ['rack1a']
assert len(repl['dc2']) == 2
assert 'rack2a' in repl['dc2'] and 'rack2b' in repl['dc2']
@@ -525,13 +523,13 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
pass
await cql.run_async("alter keyspace ks5 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b'], 'dc2' : 2};")
repl = await get_replication_options("ks5", host, servers[0].ip_addr)
repl = await get_replication_options("ks5")
assert len(repl['dc1']) == 2
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
assert repl['dc2'] == '2'
await cql.run_async("alter keyspace ks6 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : 2, 'dc2' : ['rack2a']};")
repl = await get_replication_options("ks6", host, servers[0].ip_addr)
repl = await get_replication_options("ks6")
assert repl['dc1'] == '2'
assert len(repl['dc2']) == 1
assert repl['dc2'][0] == 'rack2a'
@@ -539,9 +537,8 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str, host, ip_addr):
await read_barrier(manager.api, ip_addr)
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
async def get_replication_options(ks: str):
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl
@@ -554,11 +551,10 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc2', 'rack': 'rack2b'})]
cql = manager.get_cql()
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
repl = await get_replication_options("ks1")
assert repl['dc1'] == '1'
await cql.run_async("CREATE KEYSPACE ksv WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2} AND tablets = {'enabled': false}")
@@ -578,19 +574,19 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
servers = servers[0:-1]
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
repl = await get_replication_options("ks1")
assert repl['dc1'] == ['rack1b']
logging.info("Rolling restart")
await manager.rolling_restart(servers, wait_for_cql=True, cmdline_options_override=["--enforce-rack-list", "true", "--error-injections-at-startup", "[]", "--smp", "2"])
await cql.run_async(f"create keyspace ks2 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
repl = await get_replication_options("ks2")
assert len(repl['dc1']) == 2
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
await cql.run_async(f"create keyspace ks3 with replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': 4}};")
repl = await get_replication_options("ks3", host, servers[0].ip_addr)
repl = await get_replication_options("ks3")
assert len(repl['dc1']) == 1
assert len(repl['dc2']) == 1
assert 'rack1a' in repl['dc1'] or 'rack1b' in repl['dc1']
@@ -606,7 +602,7 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
assert failed
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b'], 'dc2': 1};")
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
repl = await get_replication_options("ks1")
assert len(repl['dc1']) == 1
assert repl['dc1'][0] == 'rack1b'
assert len(repl['dc2']) == 1
@@ -1113,9 +1109,8 @@ async def test_multi_rf_increase_before_decrease_0_N(request: pytest.FixtureRequ
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str, host, ip_addr):
await read_barrier(manager.api, ip_addr)
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
async def get_replication_options(ks: str):
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl
@@ -1133,11 +1128,10 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
cql = manager.get_cql()
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
repl = await get_replication_options("ks1")
assert repl['dc1'] == '1'
[await manager.api.disable_injection(s.ip_addr, numeric_injection) for s in servers]
@@ -1171,7 +1165,7 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
failed = True
assert failed
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
repl = await get_replication_options("ks1")
assert repl['dc1'] == '1'
@pytest.mark.asyncio

View File

@@ -18,7 +18,6 @@
#include <seastar/testing/test_case.hh>
#include "test/lib/exception_utils.hh"
#include "test/lib/log.hh"
#include "test/lib/test_utils.hh"
#include "ldap_common.hh"
#include "service/migration_manager.hh"
@@ -682,41 +681,3 @@ SEASTAR_TEST_CASE(ldap_config) {
},
make_ldap_config());
}
// Reproduces the race between the cache pruner and the permission
// loader lifecycle during shutdown. Refs SCYLLADB-1679.
SEASTAR_TEST_CASE(ldap_pruner_no_crash_after_loader_cleared) {
auto cfg = make_ldap_config();
cfg->permissions_update_interval_in_ms.set(1);
auto call_count = seastar::make_lw_shared<int>(0);
co_await do_with_cql_env_thread([call_count](cql_test_env& env) {
auto& cache = env.auth_cache().local();
testlog.info("Populating 50 cache entries");
for (int i = 0; i < 50; i++) {
auto r = auth::make_data_resource("system", fmt::format("t{}", i));
cache.get_permissions(auth::role_or_anonymous(), r).get();
}
testlog.info("Installing slow permission loader (10ms per call)");
cache.set_permission_loader(
[call_count] (const auth::role_or_anonymous&, const auth::resource&)
-> seastar::future<auth::permission_set> {
++(*call_count);
co_await seastar::sleep(std::chrono::milliseconds(10));
co_return auth::permission_set();
});
testlog.info("Waiting for pruner to start reloading");
while (*call_count == 0) {
seastar::sleep(std::chrono::milliseconds(1)).get();
}
testlog.info("Pruner started, letting teardown run");
}, cfg);
testlog.info("Loader called {} times", *call_count);
}

View File

@@ -126,9 +126,6 @@ class CppFile(pytest.File, ABC):
return args
def collect(self) -> Iterator[CppTestCase]:
if BUILD_MODE not in self.stash:
return
custom_args = self.suite_config.get("custom_args", {}).get(self.test_name, DEFAULT_CUSTOM_ARGS)
for test_case in self.list_test_cases():

View File

@@ -163,11 +163,6 @@ def scylla_binary(testpy_test) -> Path:
def pytest_collection_modifyitems(items: list[pytest.Item]) -> None:
items[:] = [
item for item in items
if (parent_file := item.getparent(cls=pytest.File)) is not None
and BUILD_MODE in parent_file.stash
]
for item in items:
modify_pytest_item(item=item)
@@ -290,10 +285,7 @@ def pytest_configure(config: pytest.Config) -> None:
pytest_log_dir.mkdir(parents=True, exist_ok=True)
if not _pytest_config.getoption("--save-log-on-success"):
for file in pytest_log_dir.glob("*"):
# This will help in case framework tests are executed with test.py event if it's the wrong way to run them.
# test_no_bare_skip_markers_in_collection uses a subprocess to run a collection that has lead to race
# condition, especially with repeat.
file.unlink(missing_ok=True)
file.unlink()
_pytest_config.stash[PYTEST_LOG_FILE] = f"{pytest_log_dir}/pytest_main_{HOST_ID}.log"
@@ -348,8 +340,7 @@ def pytest_collect_file(file_path: pathlib.Path,
repeats = list(product(build_modes, parent.config.run_ids))
if not repeats:
parent.stash[REPEATING_FILES].remove(file_path)
return collectors
return []
ihook = parent.ihook
collectors = list(chain(collectors, chain.from_iterable(

View File

@@ -75,7 +75,6 @@ def test_no_bare_skip_markers_in_collection():
"--collect-only",
"--ignore=boost", "--ignore=raft",
"--ignore=ldap", "--ignore=vector_search",
"--ignore=unit",
"-p", "no:sugar"],
capture_output=True, text=True,
cwd=str(_TEST_ROOT),