mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-28 20:27:03 +00:00
Compare commits
19 Commits
scylla-202
...
next
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c4de2b3c9d | ||
|
|
d9dd3bfe53 | ||
|
|
b0f988afc4 | ||
|
|
a7e9c0e6d2 | ||
|
|
3ea4af1c8c | ||
|
|
459e3970cd | ||
|
|
8756f7c068 | ||
|
|
2615d0e8d8 | ||
|
|
914b70c75b | ||
|
|
6b7ce5e244 | ||
|
|
9d3d424d58 | ||
|
|
86472e43e1 | ||
|
|
f2f4915e09 | ||
|
|
92c09d106d | ||
|
|
8855e77465 | ||
|
|
adf1e26bab | ||
|
|
37a547604f | ||
|
|
c3e5285d45 | ||
|
|
f75e5ac65b |
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=2026.2.0-rc0
|
||||
VERSION=2026.3.0-dev
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -258,13 +258,11 @@ future<> ldap_role_manager::start() {
|
||||
} catch (const seastar::sleep_aborted&) {
|
||||
co_return; // ignore
|
||||
}
|
||||
co_await _cache.container().invoke_on_all([] (cache& c) -> future<> {
|
||||
try {
|
||||
co_await c.reload_all_permissions();
|
||||
} catch (...) {
|
||||
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
|
||||
}
|
||||
});
|
||||
try {
|
||||
co_await _cache.reload_all_permissions();
|
||||
} catch (...) {
|
||||
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
return _std_mgr.start();
|
||||
|
||||
@@ -157,15 +157,12 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
|
||||
return create_legacy_keyspace_if_missing(mm);
|
||||
});
|
||||
}
|
||||
co_await _role_manager->start();
|
||||
if (this_shard_id() == 0) {
|
||||
// Role manager and password authenticator have this odd startup
|
||||
// mechanism where they asynchronously create the superuser role
|
||||
// in the background. Correct password creation depends on role
|
||||
// creation therefore we need to wait here.
|
||||
co_await _role_manager->ensure_superuser_is_created();
|
||||
}
|
||||
co_await when_all_succeed(_authorizer->start(), _authenticator->start()).discard_result();
|
||||
// Authorizer must be started before the permission loader is set,
|
||||
// because the loader calls _authorizer->authorize().
|
||||
// The loader must be set before starting the role manager, because
|
||||
// LDAP role manager starts a pruner fiber that calls
|
||||
// reload_all_permissions() which asserts _permission_loader is set.
|
||||
co_await _authorizer->start();
|
||||
if (!_used_by_maintenance_socket) {
|
||||
// Maintenance socket mode can't cache permissions because it has
|
||||
// different authorizer. We can't mix cached permissions, they could be
|
||||
@@ -174,12 +171,27 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
|
||||
&service::get_uncached_permissions,
|
||||
this, std::placeholders::_1, std::placeholders::_2));
|
||||
}
|
||||
co_await _role_manager->start();
|
||||
if (this_shard_id() == 0) {
|
||||
// Role manager and password authenticator have this odd startup
|
||||
// mechanism where they asynchronously create the superuser role
|
||||
// in the background. Correct password creation depends on role
|
||||
// creation therefore we need to wait here.
|
||||
co_await _role_manager->ensure_superuser_is_created();
|
||||
}
|
||||
// Authenticator must be started after ensure_superuser_is_created()
|
||||
// because password_authenticator queries system.roles for the
|
||||
// superuser entry created by the role manager.
|
||||
co_await _authenticator->start();
|
||||
}
|
||||
|
||||
future<> service::stop() {
|
||||
_as.request_abort();
|
||||
// Reverse of start() order.
|
||||
co_await _authenticator->stop();
|
||||
co_await _role_manager->stop();
|
||||
_cache.set_permission_loader(nullptr);
|
||||
return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop()).discard_result();
|
||||
co_await _authorizer->stop();
|
||||
}
|
||||
|
||||
future<> service::ensure_superuser_is_created() {
|
||||
|
||||
@@ -4237,6 +4237,7 @@ public:
|
||||
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
|
||||
, _async_gate("topology_coordinator")
|
||||
{
|
||||
_lifecycle_notifier.register_subscriber(this);
|
||||
_db.get_notifier().register_listener(this);
|
||||
// When the delay_cdc_stream_finalization error injection is disabled
|
||||
// (test releases it), wake the topology coordinator so it retries
|
||||
@@ -4400,6 +4401,7 @@ future<bool> topology_coordinator::maybe_retry_failed_rf_change_tablet_rebuilds(
|
||||
}
|
||||
|
||||
future<> topology_coordinator::refresh_tablet_load_stats() {
|
||||
co_await utils::get_local_injector().inject("refresh_tablet_load_stats_pause", utils::wait_for_message(5min));
|
||||
auto tm = get_token_metadata_ptr();
|
||||
|
||||
locator::load_stats stats;
|
||||
@@ -4723,7 +4725,6 @@ future<> topology_coordinator::run() {
|
||||
|
||||
co_await _async_gate.close();
|
||||
co_await std::move(tablet_load_stats_refresher);
|
||||
co_await _tablet_load_stats_refresh.join();
|
||||
co_await std::move(cdc_generation_publisher);
|
||||
co_await std::move(cdc_streams_gc);
|
||||
co_await std::move(gossiper_orphan_remover);
|
||||
@@ -4736,6 +4737,8 @@ future<> topology_coordinator::stop() {
|
||||
co_await _db.get_notifier().unregister_listener(this);
|
||||
utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization");
|
||||
_topo_sm.on_tablet_split_ready = nullptr;
|
||||
co_await _lifecycle_notifier.unregister_subscriber(this);
|
||||
co_await _tablet_load_stats_refresh.join();
|
||||
|
||||
// if topology_coordinator::run() is aborted either because we are not a
|
||||
// leader anymore, or we are shutting down as a leader, we have to handle
|
||||
@@ -4797,7 +4800,6 @@ future<> run_topology_coordinator(
|
||||
topology_cmd_rpc_tracker};
|
||||
|
||||
std::exception_ptr ex;
|
||||
lifecycle_notifier.register_subscriber(&coordinator);
|
||||
try {
|
||||
rtlogger.info("start topology coordinator fiber");
|
||||
co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
|
||||
@@ -4818,7 +4820,7 @@ future<> run_topology_coordinator(
|
||||
}
|
||||
on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex));
|
||||
}
|
||||
co_await lifecycle_notifier.unregister_subscriber(&coordinator);
|
||||
co_await utils::get_local_injector().inject("topology_coordinator_pause_before_stop", utils::wait_for_message(5min));
|
||||
co_await coordinator.stop();
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair
|
||||
from test.pylib.tablets import get_all_tablet_replicas
|
||||
from test.cluster.tasks.task_manager_client import TaskManagerClient
|
||||
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown
|
||||
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, ensure_group0_leader_on, new_test_keyspace, new_test_table, trigger_stepdown
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
|
||||
from cassandra.query import ConsistencyLevel, SimpleStatement
|
||||
@@ -880,41 +880,30 @@ async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manage
|
||||
# affected replica but process the UNREPAIRED sstable on the others, so the classification
|
||||
# divergence is never corrected. In tombstone scenarios this enables premature tombstone GC
|
||||
# on the affected replica leading to data resurrection.
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
|
||||
cmdline = ['--hinted-handoff-enabled', '0']
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
|
||||
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
|
||||
|
||||
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
|
||||
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
|
||||
await cql.run_async(
|
||||
f"ALTER TABLE {ks}.test WITH compaction = "
|
||||
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
|
||||
)
|
||||
class _LeadershipTransferred(Exception):
|
||||
"""Raised when leadership transferred to servers[1] during the test, requiring a retry."""
|
||||
pass
|
||||
|
||||
# Disable autocompaction everywhere so we control exactly when compaction runs.
|
||||
for s in servers:
|
||||
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
|
||||
|
||||
scylla_path = await manager.server_get_exe(servers[0].server_id)
|
||||
|
||||
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
|
||||
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
|
||||
# S0'(repaired_at=1) on all nodes.
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
|
||||
# Insert keys 10-19 and flush on all nodes → S1(repaired_at=0).
|
||||
# These will be the subject of repair 2.
|
||||
repair2_keys = list(range(current_key, current_key + 10))
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
|
||||
for s in servers:
|
||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||
current_key += 10
|
||||
async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, token, scylla_path, current_key):
|
||||
"""Core logic for test_incremental_repair_race_window_promotes_unrepaired_data.
|
||||
|
||||
Returns the next current_key value.
|
||||
Raises _LeadershipTransferred if servers[1] becomes coordinator after the
|
||||
restart, signalling the caller to retry.
|
||||
"""
|
||||
# Ensure servers[1] is not the topology coordinator. If the coordinator is
|
||||
# restarted, the Raft leader dies, a new election occurs, and the new
|
||||
# coordinator re-initiates tablet repair -- flushing memtables on all replicas
|
||||
# and marking post-repair data as repaired. That legitimate re-repair masks
|
||||
# the compaction-merge bug this test detects.
|
||||
coord = await get_topology_coordinator(manager)
|
||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||
if coord_serv == servers[1]:
|
||||
other = next(s for s in servers if s != servers[1])
|
||||
await ensure_group0_leader_on(manager, other)
|
||||
coord = await get_topology_coordinator(manager)
|
||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||
coord_log = await manager.server_open_log(coord_serv.server_id)
|
||||
coord_mark = await coord_log.mark()
|
||||
|
||||
@@ -978,6 +967,16 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
await manager.server_start(target.server_id)
|
||||
await manager.servers_see_each_other(servers)
|
||||
|
||||
# Check if leadership transferred to servers[1] during the restart.
|
||||
# If so, the new coordinator will re-initiate repair, masking the bug.
|
||||
new_coord = await get_topology_coordinator(manager)
|
||||
new_coord_serv = await find_server_by_host_id(manager, servers, new_coord)
|
||||
if new_coord_serv == servers[1]:
|
||||
await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update")
|
||||
await manager.api.wait_task(servers[0].ip_addr, task_id)
|
||||
raise _LeadershipTransferred(
|
||||
"servers[1] became topology coordinator after restart")
|
||||
|
||||
# Poll until compaction has produced F(repaired_at=2) containing post-repair keys,
|
||||
# confirming that the bug was triggered (S1' and E merged during the race window).
|
||||
deadline = time.time() + 60
|
||||
@@ -1000,7 +999,7 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
if not compaction_ran:
|
||||
logger.warning("Compaction did not merge S1' and E after restart during the race window; "
|
||||
"the bug was not triggered. Skipping assertion.")
|
||||
return
|
||||
return current_key
|
||||
|
||||
# Flush servers[0] and servers[2] AFTER the race window closes so their post-repair
|
||||
# keys land in G(repaired_at=0): correctly classified as UNREPAIRED.
|
||||
@@ -1031,8 +1030,9 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, "
|
||||
f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}")
|
||||
|
||||
# servers[0] and servers[2] flushed post-repair keys after the race window closed,
|
||||
# so those keys are in G(repaired_at=0) → correctly UNREPAIRED.
|
||||
# servers[0] and servers[2] were never restarted and the coordinator stayed
|
||||
# alive throughout, so no re-repair could have flushed their memtables.
|
||||
# Post-repair keys must NOT appear in repaired sstables on these servers.
|
||||
assert not (repaired_keys_0 & post_repair_key_set), \
|
||||
f"servers[0] should not have post-repair keys in repaired sstables, " \
|
||||
f"got: {repaired_keys_0 & post_repair_key_set}"
|
||||
@@ -1053,6 +1053,54 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
f"on servers[1] after restart lost the being_repaired markers during the race window. " \
|
||||
f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \
|
||||
f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"
|
||||
return current_key
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
|
||||
cmdline = ['--hinted-handoff-enabled', '0']
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
|
||||
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
|
||||
|
||||
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
|
||||
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
|
||||
await cql.run_async(
|
||||
f"ALTER TABLE {ks}.test WITH compaction = "
|
||||
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
|
||||
)
|
||||
|
||||
# Disable autocompaction everywhere so we control exactly when compaction runs.
|
||||
for s in servers:
|
||||
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
|
||||
|
||||
scylla_path = await manager.server_get_exe(servers[0].server_id)
|
||||
|
||||
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
|
||||
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
|
||||
# S0'(repaired_at=1) on all nodes.
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
|
||||
# Insert keys 10-19 and flush on all nodes -> S1(repaired_at=0).
|
||||
# These will be the subject of repair 2.
|
||||
repair2_keys = list(range(current_key, current_key + 10))
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
|
||||
for s in servers:
|
||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||
current_key += 10
|
||||
|
||||
# If leadership transfers to servers[1] between our coordinator check and the
|
||||
# restart, the coordinator change masks the bug. Detect and retry.
|
||||
max_attempts = 5
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
try:
|
||||
current_key = await _do_race_window_promotes_unrepaired_data(
|
||||
manager, servers, cql, ks, token, scylla_path, current_key)
|
||||
return
|
||||
except _LeadershipTransferred as e:
|
||||
logger.warning(f"Attempt {attempt}/{max_attempts}: {e}. Retrying.")
|
||||
|
||||
pytest.fail(f"Leadership kept transferring to servers[1] after {max_attempts} attempts; "
|
||||
"could not run the test without coordinator interference.")
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
# Tombstone GC safety tests
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
#
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.cluster.util import get_topology_coordinator, trigger_stepdown
|
||||
from test.cluster.util import get_topology_coordinator, trigger_stepdown, new_test_keyspace, new_test_table
|
||||
|
||||
import pytest
|
||||
import logging
|
||||
@@ -83,3 +83,78 @@ async def test_load_stats_on_coordinator_failover(manager: ManagerClient):
|
||||
coord3 = await get_topology_coordinator(manager)
|
||||
if coord3:
|
||||
break
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_load_stats_refresh_during_shutdown(manager: ManagerClient):
|
||||
"""Verify that _tablet_load_stats_refresh is properly joined during
|
||||
topology coordinator shutdown, even when a schema change notification
|
||||
triggers a refresh between run() completing and stop() being called.
|
||||
|
||||
Reproduces the scenario using two injection points:
|
||||
- topology_coordinator_pause_before_stop: pauses after run() finishes
|
||||
but before stop() is called
|
||||
- refresh_tablet_load_stats_pause: holds refresh_tablet_load_stats()
|
||||
so it's still in-flight during shutdown
|
||||
|
||||
Without the join in stop(), the refresh task outlives the coordinator
|
||||
and accesses freed memory.
|
||||
"""
|
||||
servers = await manager.servers_add(3)
|
||||
await manager.get_ready_cql(servers)
|
||||
|
||||
async with new_test_keyspace(manager,
|
||||
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
||||
coord = await get_topology_coordinator(manager)
|
||||
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
||||
coord_idx = host_ids.index(coord)
|
||||
coord_server = servers[coord_idx]
|
||||
|
||||
log = await manager.server_open_log(coord_server.server_id)
|
||||
mark = await log.mark()
|
||||
|
||||
# Injection B: pause between run() returning and stop() being called.
|
||||
await manager.api.enable_injection(
|
||||
coord_server.ip_addr, "topology_coordinator_pause_before_stop", one_shot=True)
|
||||
|
||||
# Stepdown causes the topology coordinator to abort and shut down.
|
||||
logger.info("Triggering stepdown on coordinator")
|
||||
await trigger_stepdown(manager, coord_server)
|
||||
|
||||
# Wait for injection B to fire. The coordinator has finished run() but
|
||||
# the schema change listener is still registered.
|
||||
mark, _ = await log.wait_for(
|
||||
"topology_coordinator_pause_before_stop: waiting", from_mark=mark)
|
||||
|
||||
# Injection A: block refresh_tablet_load_stats() before it accesses _shared_tm.
|
||||
# Enable it now so it only catches the notification-triggered call.
|
||||
await manager.api.enable_injection(
|
||||
coord_server.ip_addr, "refresh_tablet_load_stats_pause", one_shot=True)
|
||||
|
||||
# CREATE TABLE fires on_create_column_family on the old coordinator which
|
||||
# fire-and-forgets _tablet_load_stats_refresh.trigger() scheduling a task
|
||||
# via with_scheduling_group on the gossip scheduling group.
|
||||
logger.info("Issuing CREATE TABLE while coordinator is paused before stop()")
|
||||
async with new_test_table(manager, ks, "pk int PRIMARY KEY", reuse_tables=False):
|
||||
# Wait for injection A: refresh_tablet_load_stats() is now blocked before
|
||||
# accessing _shared_tm. The topology_coordinator is still alive (paused at B).
|
||||
await log.wait_for("refresh_tablet_load_stats_pause: waiting", from_mark=mark)
|
||||
|
||||
# Release injection B: coordinator proceeds through stop().
|
||||
# Without the fix, stop() returns quickly and run_topology_coordinator
|
||||
# frees the topology_coordinator frame. With the fix, stop() blocks at
|
||||
# _tablet_load_stats_refresh.join() until injection A is released.
|
||||
logger.info("Releasing injection B: coordinator will stop")
|
||||
await manager.api.message_injection(
|
||||
coord_server.ip_addr, "topology_coordinator_pause_before_stop")
|
||||
|
||||
# Release injection A: refresh_tablet_load_stats() resumes and accesses
|
||||
# this->_shared_tm via get_token_metadata_ptr(). Without the fix, 'this'
|
||||
# points to freed memory and ASan detects heap-use-after-free.
|
||||
logger.info("Releasing injection A: refresh resumes")
|
||||
await manager.api.message_injection(
|
||||
coord_server.ip_addr, "refresh_tablet_load_stats_pause")
|
||||
|
||||
# If the bug is present, the node crashed. read_barrier will fail.
|
||||
await read_barrier(manager.api, coord_server.ip_addr)
|
||||
|
||||
@@ -435,8 +435,9 @@ async def test_alter_tablets_rf_dc_drop(request: pytest.FixtureRequest, manager:
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
||||
async def get_replication_options(ks: str):
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
||||
async def get_replication_options(ks: str, host, ip_addr):
|
||||
await read_barrier(manager.api, ip_addr)
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
|
||||
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
||||
return repl
|
||||
|
||||
@@ -451,43 +452,44 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
|
||||
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
||||
|
||||
cql = manager.get_cql()
|
||||
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
|
||||
|
||||
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
||||
await cql.run_async("create table ks1.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks1")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
await cql.run_async("create keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 2} and tablets = {'initial': 4};")
|
||||
await cql.run_async("create table ks2.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks2")
|
||||
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == '1'
|
||||
assert repl['dc2'] == '2'
|
||||
|
||||
await cql.run_async("create keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
|
||||
await cql.run_async("create table ks3.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks3")
|
||||
repl = await get_replication_options("ks3", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
await cql.run_async("create keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
|
||||
await cql.run_async("create table ks4.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks4")
|
||||
repl = await get_replication_options("ks4", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
await cql.run_async(f"create keyspace ks5 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}} and tablets = {{'initial': 4}};")
|
||||
await cql.run_async("create table ks5.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks5")
|
||||
repl = await get_replication_options("ks5", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == '2'
|
||||
assert repl['dc2'] == '2'
|
||||
|
||||
await cql.run_async(f"create keyspace ks6 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
|
||||
await cql.run_async("create table ks6.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks6")
|
||||
repl = await get_replication_options("ks6", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == '2'
|
||||
|
||||
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
|
||||
|
||||
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
|
||||
repl = await get_replication_options("ks1")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == ['rack1b']
|
||||
|
||||
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks1", "t")
|
||||
@@ -497,7 +499,7 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
|
||||
assert r.replicas[0][0] == host_ids[1]
|
||||
|
||||
await cql.run_async("alter keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a', 'rack2b']};")
|
||||
repl = await get_replication_options("ks2")
|
||||
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == ['rack1a']
|
||||
assert len(repl['dc2']) == 2
|
||||
assert 'rack2a' in repl['dc2'] and 'rack2b' in repl['dc2']
|
||||
@@ -523,13 +525,13 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
|
||||
pass
|
||||
|
||||
await cql.run_async("alter keyspace ks5 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b'], 'dc2' : 2};")
|
||||
repl = await get_replication_options("ks5")
|
||||
repl = await get_replication_options("ks5", host, servers[0].ip_addr)
|
||||
assert len(repl['dc1']) == 2
|
||||
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
|
||||
assert repl['dc2'] == '2'
|
||||
|
||||
await cql.run_async("alter keyspace ks6 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : 2, 'dc2' : ['rack2a']};")
|
||||
repl = await get_replication_options("ks6")
|
||||
repl = await get_replication_options("ks6", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == '2'
|
||||
assert len(repl['dc2']) == 1
|
||||
assert repl['dc2'][0] == 'rack2a'
|
||||
@@ -537,8 +539,9 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
||||
async def get_replication_options(ks: str):
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
||||
async def get_replication_options(ks: str, host, ip_addr):
|
||||
await read_barrier(manager.api, ip_addr)
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
|
||||
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
||||
return repl
|
||||
|
||||
@@ -551,10 +554,11 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
|
||||
await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc2', 'rack': 'rack2b'})]
|
||||
|
||||
cql = manager.get_cql()
|
||||
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
|
||||
|
||||
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
||||
await cql.run_async("create table ks1.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks1")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
await cql.run_async("CREATE KEYSPACE ksv WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2} AND tablets = {'enabled': false}")
|
||||
@@ -574,19 +578,19 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
|
||||
servers = servers[0:-1]
|
||||
|
||||
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
|
||||
repl = await get_replication_options("ks1")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == ['rack1b']
|
||||
|
||||
logging.info("Rolling restart")
|
||||
await manager.rolling_restart(servers, wait_for_cql=True, cmdline_options_override=["--enforce-rack-list", "true", "--error-injections-at-startup", "[]", "--smp", "2"])
|
||||
|
||||
await cql.run_async(f"create keyspace ks2 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
|
||||
repl = await get_replication_options("ks2")
|
||||
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
|
||||
assert len(repl['dc1']) == 2
|
||||
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
|
||||
|
||||
await cql.run_async(f"create keyspace ks3 with replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': 4}};")
|
||||
repl = await get_replication_options("ks3")
|
||||
repl = await get_replication_options("ks3", host, servers[0].ip_addr)
|
||||
assert len(repl['dc1']) == 1
|
||||
assert len(repl['dc2']) == 1
|
||||
assert 'rack1a' in repl['dc1'] or 'rack1b' in repl['dc1']
|
||||
@@ -602,7 +606,7 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
|
||||
assert failed
|
||||
|
||||
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b'], 'dc2': 1};")
|
||||
repl = await get_replication_options("ks1")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
assert len(repl['dc1']) == 1
|
||||
assert repl['dc1'][0] == 'rack1b'
|
||||
assert len(repl['dc2']) == 1
|
||||
@@ -1109,8 +1113,9 @@ async def test_multi_rf_increase_before_decrease_0_N(request: pytest.FixtureRequ
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
||||
async def get_replication_options(ks: str):
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
||||
async def get_replication_options(ks: str, host, ip_addr):
|
||||
await read_barrier(manager.api, ip_addr)
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
|
||||
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
||||
return repl
|
||||
|
||||
@@ -1128,10 +1133,11 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
|
||||
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
||||
|
||||
cql = manager.get_cql()
|
||||
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
|
||||
|
||||
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
||||
await cql.run_async("create table ks1.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks1")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
[await manager.api.disable_injection(s.ip_addr, numeric_injection) for s in servers]
|
||||
@@ -1165,7 +1171,7 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
|
||||
failed = True
|
||||
assert failed
|
||||
|
||||
repl = await get_replication_options("ks1")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
#include <seastar/testing/test_case.hh>
|
||||
|
||||
#include "test/lib/exception_utils.hh"
|
||||
#include "test/lib/log.hh"
|
||||
#include "test/lib/test_utils.hh"
|
||||
#include "ldap_common.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
@@ -681,3 +682,41 @@ SEASTAR_TEST_CASE(ldap_config) {
|
||||
},
|
||||
make_ldap_config());
|
||||
}
|
||||
|
||||
// Reproduces the race between the cache pruner and the permission
|
||||
// loader lifecycle during shutdown. Refs SCYLLADB-1679.
|
||||
SEASTAR_TEST_CASE(ldap_pruner_no_crash_after_loader_cleared) {
|
||||
auto cfg = make_ldap_config();
|
||||
cfg->permissions_update_interval_in_ms.set(1);
|
||||
|
||||
auto call_count = seastar::make_lw_shared<int>(0);
|
||||
|
||||
co_await do_with_cql_env_thread([call_count](cql_test_env& env) {
|
||||
auto& cache = env.auth_cache().local();
|
||||
|
||||
testlog.info("Populating 50 cache entries");
|
||||
for (int i = 0; i < 50; i++) {
|
||||
auto r = auth::make_data_resource("system", fmt::format("t{}", i));
|
||||
cache.get_permissions(auth::role_or_anonymous(), r).get();
|
||||
}
|
||||
|
||||
testlog.info("Installing slow permission loader (10ms per call)");
|
||||
cache.set_permission_loader(
|
||||
[call_count] (const auth::role_or_anonymous&, const auth::resource&)
|
||||
-> seastar::future<auth::permission_set> {
|
||||
++(*call_count);
|
||||
co_await seastar::sleep(std::chrono::milliseconds(10));
|
||||
co_return auth::permission_set();
|
||||
});
|
||||
|
||||
testlog.info("Waiting for pruner to start reloading");
|
||||
while (*call_count == 0) {
|
||||
seastar::sleep(std::chrono::milliseconds(1)).get();
|
||||
}
|
||||
|
||||
testlog.info("Pruner started, letting teardown run");
|
||||
}, cfg);
|
||||
|
||||
testlog.info("Loader called {} times", *call_count);
|
||||
}
|
||||
|
||||
|
||||
@@ -126,6 +126,9 @@ class CppFile(pytest.File, ABC):
|
||||
return args
|
||||
|
||||
def collect(self) -> Iterator[CppTestCase]:
|
||||
if BUILD_MODE not in self.stash:
|
||||
return
|
||||
|
||||
custom_args = self.suite_config.get("custom_args", {}).get(self.test_name, DEFAULT_CUSTOM_ARGS)
|
||||
|
||||
for test_case in self.list_test_cases():
|
||||
|
||||
@@ -163,6 +163,11 @@ def scylla_binary(testpy_test) -> Path:
|
||||
|
||||
|
||||
def pytest_collection_modifyitems(items: list[pytest.Item]) -> None:
|
||||
items[:] = [
|
||||
item for item in items
|
||||
if (parent_file := item.getparent(cls=pytest.File)) is not None
|
||||
and BUILD_MODE in parent_file.stash
|
||||
]
|
||||
for item in items:
|
||||
modify_pytest_item(item=item)
|
||||
|
||||
@@ -285,7 +290,10 @@ def pytest_configure(config: pytest.Config) -> None:
|
||||
pytest_log_dir.mkdir(parents=True, exist_ok=True)
|
||||
if not _pytest_config.getoption("--save-log-on-success"):
|
||||
for file in pytest_log_dir.glob("*"):
|
||||
file.unlink()
|
||||
# This will help in case framework tests are executed with test.py event if it's the wrong way to run them.
|
||||
# test_no_bare_skip_markers_in_collection uses a subprocess to run a collection that has lead to race
|
||||
# condition, especially with repeat.
|
||||
file.unlink(missing_ok=True)
|
||||
|
||||
_pytest_config.stash[PYTEST_LOG_FILE] = f"{pytest_log_dir}/pytest_main_{HOST_ID}.log"
|
||||
|
||||
@@ -340,7 +348,8 @@ def pytest_collect_file(file_path: pathlib.Path,
|
||||
repeats = list(product(build_modes, parent.config.run_ids))
|
||||
|
||||
if not repeats:
|
||||
return []
|
||||
parent.stash[REPEATING_FILES].remove(file_path)
|
||||
return collectors
|
||||
|
||||
ihook = parent.ihook
|
||||
collectors = list(chain(collectors, chain.from_iterable(
|
||||
|
||||
@@ -75,6 +75,7 @@ def test_no_bare_skip_markers_in_collection():
|
||||
"--collect-only",
|
||||
"--ignore=boost", "--ignore=raft",
|
||||
"--ignore=ldap", "--ignore=vector_search",
|
||||
"--ignore=unit",
|
||||
"-p", "no:sugar"],
|
||||
capture_output=True, text=True,
|
||||
cwd=str(_TEST_ROOT),
|
||||
|
||||
Reference in New Issue
Block a user