mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-29 04:37:00 +00:00
Compare commits
2 Commits
master
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
93fbc0a683 | ||
|
|
520466b407 |
@@ -258,11 +258,13 @@ future<> ldap_role_manager::start() {
|
||||
} catch (const seastar::sleep_aborted&) {
|
||||
co_return; // ignore
|
||||
}
|
||||
try {
|
||||
co_await _cache.reload_all_permissions();
|
||||
} catch (...) {
|
||||
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
|
||||
}
|
||||
co_await _cache.container().invoke_on_all([] (cache& c) -> future<> {
|
||||
try {
|
||||
co_await c.reload_all_permissions();
|
||||
} catch (...) {
|
||||
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
return _std_mgr.start();
|
||||
|
||||
@@ -157,20 +157,6 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
|
||||
return create_legacy_keyspace_if_missing(mm);
|
||||
});
|
||||
}
|
||||
// Authorizer must be started before the permission loader is set,
|
||||
// because the loader calls _authorizer->authorize().
|
||||
// The loader must be set before starting the role manager, because
|
||||
// LDAP role manager starts a pruner fiber that calls
|
||||
// reload_all_permissions() which asserts _permission_loader is set.
|
||||
co_await _authorizer->start();
|
||||
if (!_used_by_maintenance_socket) {
|
||||
// Maintenance socket mode can't cache permissions because it has
|
||||
// different authorizer. We can't mix cached permissions, they could be
|
||||
// different in normal mode.
|
||||
_cache.set_permission_loader(std::bind(
|
||||
&service::get_uncached_permissions,
|
||||
this, std::placeholders::_1, std::placeholders::_2));
|
||||
}
|
||||
co_await _role_manager->start();
|
||||
if (this_shard_id() == 0) {
|
||||
// Role manager and password authenticator have this odd startup
|
||||
@@ -179,19 +165,21 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
|
||||
// creation therefore we need to wait here.
|
||||
co_await _role_manager->ensure_superuser_is_created();
|
||||
}
|
||||
// Authenticator must be started after ensure_superuser_is_created()
|
||||
// because password_authenticator queries system.roles for the
|
||||
// superuser entry created by the role manager.
|
||||
co_await _authenticator->start();
|
||||
co_await when_all_succeed(_authorizer->start(), _authenticator->start()).discard_result();
|
||||
if (!_used_by_maintenance_socket) {
|
||||
// Maintenance socket mode can't cache permissions because it has
|
||||
// different authorizer. We can't mix cached permissions, they could be
|
||||
// different in normal mode.
|
||||
_cache.set_permission_loader(std::bind(
|
||||
&service::get_uncached_permissions,
|
||||
this, std::placeholders::_1, std::placeholders::_2));
|
||||
}
|
||||
}
|
||||
|
||||
future<> service::stop() {
|
||||
_as.request_abort();
|
||||
// Reverse of start() order.
|
||||
co_await _authenticator->stop();
|
||||
co_await _role_manager->stop();
|
||||
_cache.set_permission_loader(nullptr);
|
||||
co_await _authorizer->stop();
|
||||
return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop()).discard_result();
|
||||
}
|
||||
|
||||
future<> service::ensure_superuser_is_created() {
|
||||
|
||||
@@ -71,7 +71,7 @@ used. If it is used, the statement will be a no-op if the materialized view alre
|
||||
MV Select Statement
|
||||
...................
|
||||
|
||||
The select statement of a materialized view creation defines which of the base table is included in the view. That
|
||||
The select statement of a materialized view creation defines which of the base table columns are included in the view. That
|
||||
statement is limited in a number of ways:
|
||||
|
||||
- The :ref:`selection <selection-clause>` is limited to those that only select columns of the base table. In other
|
||||
|
||||
@@ -4237,7 +4237,6 @@ public:
|
||||
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
|
||||
, _async_gate("topology_coordinator")
|
||||
{
|
||||
_lifecycle_notifier.register_subscriber(this);
|
||||
_db.get_notifier().register_listener(this);
|
||||
// When the delay_cdc_stream_finalization error injection is disabled
|
||||
// (test releases it), wake the topology coordinator so it retries
|
||||
@@ -4401,7 +4400,6 @@ future<bool> topology_coordinator::maybe_retry_failed_rf_change_tablet_rebuilds(
|
||||
}
|
||||
|
||||
future<> topology_coordinator::refresh_tablet_load_stats() {
|
||||
co_await utils::get_local_injector().inject("refresh_tablet_load_stats_pause", utils::wait_for_message(5min));
|
||||
auto tm = get_token_metadata_ptr();
|
||||
|
||||
locator::load_stats stats;
|
||||
@@ -4725,6 +4723,7 @@ future<> topology_coordinator::run() {
|
||||
|
||||
co_await _async_gate.close();
|
||||
co_await std::move(tablet_load_stats_refresher);
|
||||
co_await _tablet_load_stats_refresh.join();
|
||||
co_await std::move(cdc_generation_publisher);
|
||||
co_await std::move(cdc_streams_gc);
|
||||
co_await std::move(gossiper_orphan_remover);
|
||||
@@ -4737,8 +4736,6 @@ future<> topology_coordinator::stop() {
|
||||
co_await _db.get_notifier().unregister_listener(this);
|
||||
utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization");
|
||||
_topo_sm.on_tablet_split_ready = nullptr;
|
||||
co_await _lifecycle_notifier.unregister_subscriber(this);
|
||||
co_await _tablet_load_stats_refresh.join();
|
||||
|
||||
// if topology_coordinator::run() is aborted either because we are not a
|
||||
// leader anymore, or we are shutting down as a leader, we have to handle
|
||||
@@ -4800,6 +4797,7 @@ future<> run_topology_coordinator(
|
||||
topology_cmd_rpc_tracker};
|
||||
|
||||
std::exception_ptr ex;
|
||||
lifecycle_notifier.register_subscriber(&coordinator);
|
||||
try {
|
||||
rtlogger.info("start topology coordinator fiber");
|
||||
co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
|
||||
@@ -4820,7 +4818,7 @@ future<> run_topology_coordinator(
|
||||
}
|
||||
on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex));
|
||||
}
|
||||
co_await utils::get_local_injector().inject("topology_coordinator_pause_before_stop", utils::wait_for_message(5min));
|
||||
co_await lifecycle_notifier.unregister_subscriber(&coordinator);
|
||||
co_await coordinator.stop();
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair
|
||||
from test.pylib.tablets import get_all_tablet_replicas
|
||||
from test.cluster.tasks.task_manager_client import TaskManagerClient
|
||||
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, ensure_group0_leader_on, new_test_keyspace, new_test_table, trigger_stepdown
|
||||
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
|
||||
from cassandra.query import ConsistencyLevel, SimpleStatement
|
||||
@@ -880,30 +880,41 @@ async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manage
|
||||
# affected replica but process the UNREPAIRED sstable on the others, so the classification
|
||||
# divergence is never corrected. In tombstone scenarios this enables premature tombstone GC
|
||||
# on the affected replica leading to data resurrection.
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
|
||||
cmdline = ['--hinted-handoff-enabled', '0']
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
|
||||
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
|
||||
|
||||
class _LeadershipTransferred(Exception):
|
||||
"""Raised when leadership transferred to servers[1] during the test, requiring a retry."""
|
||||
pass
|
||||
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
|
||||
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
|
||||
await cql.run_async(
|
||||
f"ALTER TABLE {ks}.test WITH compaction = "
|
||||
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
|
||||
)
|
||||
|
||||
async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, token, scylla_path, current_key):
|
||||
"""Core logic for test_incremental_repair_race_window_promotes_unrepaired_data.
|
||||
# Disable autocompaction everywhere so we control exactly when compaction runs.
|
||||
for s in servers:
|
||||
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
|
||||
|
||||
scylla_path = await manager.server_get_exe(servers[0].server_id)
|
||||
|
||||
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
|
||||
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
|
||||
# S0'(repaired_at=1) on all nodes.
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
|
||||
# Insert keys 10-19 and flush on all nodes → S1(repaired_at=0).
|
||||
# These will be the subject of repair 2.
|
||||
repair2_keys = list(range(current_key, current_key + 10))
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
|
||||
for s in servers:
|
||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||
current_key += 10
|
||||
|
||||
Returns the next current_key value.
|
||||
Raises _LeadershipTransferred if servers[1] becomes coordinator after the
|
||||
restart, signalling the caller to retry.
|
||||
"""
|
||||
# Ensure servers[1] is not the topology coordinator. If the coordinator is
|
||||
# restarted, the Raft leader dies, a new election occurs, and the new
|
||||
# coordinator re-initiates tablet repair -- flushing memtables on all replicas
|
||||
# and marking post-repair data as repaired. That legitimate re-repair masks
|
||||
# the compaction-merge bug this test detects.
|
||||
coord = await get_topology_coordinator(manager)
|
||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||
if coord_serv == servers[1]:
|
||||
other = next(s for s in servers if s != servers[1])
|
||||
await ensure_group0_leader_on(manager, other)
|
||||
coord = await get_topology_coordinator(manager)
|
||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||
coord_log = await manager.server_open_log(coord_serv.server_id)
|
||||
coord_mark = await coord_log.mark()
|
||||
|
||||
@@ -967,16 +978,6 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
|
||||
await manager.server_start(target.server_id)
|
||||
await manager.servers_see_each_other(servers)
|
||||
|
||||
# Check if leadership transferred to servers[1] during the restart.
|
||||
# If so, the new coordinator will re-initiate repair, masking the bug.
|
||||
new_coord = await get_topology_coordinator(manager)
|
||||
new_coord_serv = await find_server_by_host_id(manager, servers, new_coord)
|
||||
if new_coord_serv == servers[1]:
|
||||
await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update")
|
||||
await manager.api.wait_task(servers[0].ip_addr, task_id)
|
||||
raise _LeadershipTransferred(
|
||||
"servers[1] became topology coordinator after restart")
|
||||
|
||||
# Poll until compaction has produced F(repaired_at=2) containing post-repair keys,
|
||||
# confirming that the bug was triggered (S1' and E merged during the race window).
|
||||
deadline = time.time() + 60
|
||||
@@ -999,7 +1000,7 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
|
||||
if not compaction_ran:
|
||||
logger.warning("Compaction did not merge S1' and E after restart during the race window; "
|
||||
"the bug was not triggered. Skipping assertion.")
|
||||
return current_key
|
||||
return
|
||||
|
||||
# Flush servers[0] and servers[2] AFTER the race window closes so their post-repair
|
||||
# keys land in G(repaired_at=0): correctly classified as UNREPAIRED.
|
||||
@@ -1030,9 +1031,8 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
|
||||
f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, "
|
||||
f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}")
|
||||
|
||||
# servers[0] and servers[2] were never restarted and the coordinator stayed
|
||||
# alive throughout, so no re-repair could have flushed their memtables.
|
||||
# Post-repair keys must NOT appear in repaired sstables on these servers.
|
||||
# servers[0] and servers[2] flushed post-repair keys after the race window closed,
|
||||
# so those keys are in G(repaired_at=0) → correctly UNREPAIRED.
|
||||
assert not (repaired_keys_0 & post_repair_key_set), \
|
||||
f"servers[0] should not have post-repair keys in repaired sstables, " \
|
||||
f"got: {repaired_keys_0 & post_repair_key_set}"
|
||||
@@ -1053,54 +1053,6 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
|
||||
f"on servers[1] after restart lost the being_repaired markers during the race window. " \
|
||||
f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \
|
||||
f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"
|
||||
return current_key
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
|
||||
cmdline = ['--hinted-handoff-enabled', '0']
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
|
||||
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
|
||||
|
||||
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
|
||||
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
|
||||
await cql.run_async(
|
||||
f"ALTER TABLE {ks}.test WITH compaction = "
|
||||
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
|
||||
)
|
||||
|
||||
# Disable autocompaction everywhere so we control exactly when compaction runs.
|
||||
for s in servers:
|
||||
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
|
||||
|
||||
scylla_path = await manager.server_get_exe(servers[0].server_id)
|
||||
|
||||
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
|
||||
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
|
||||
# S0'(repaired_at=1) on all nodes.
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
|
||||
# Insert keys 10-19 and flush on all nodes -> S1(repaired_at=0).
|
||||
# These will be the subject of repair 2.
|
||||
repair2_keys = list(range(current_key, current_key + 10))
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
|
||||
for s in servers:
|
||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||
current_key += 10
|
||||
|
||||
# If leadership transfers to servers[1] between our coordinator check and the
|
||||
# restart, the coordinator change masks the bug. Detect and retry.
|
||||
max_attempts = 5
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
try:
|
||||
current_key = await _do_race_window_promotes_unrepaired_data(
|
||||
manager, servers, cql, ks, token, scylla_path, current_key)
|
||||
return
|
||||
except _LeadershipTransferred as e:
|
||||
logger.warning(f"Attempt {attempt}/{max_attempts}: {e}. Retrying.")
|
||||
|
||||
pytest.fail(f"Leadership kept transferring to servers[1] after {max_attempts} attempts; "
|
||||
"could not run the test without coordinator interference.")
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
# Tombstone GC safety tests
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
#
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.cluster.util import get_topology_coordinator, trigger_stepdown, new_test_keyspace, new_test_table
|
||||
from test.cluster.util import get_topology_coordinator, trigger_stepdown
|
||||
|
||||
import pytest
|
||||
import logging
|
||||
@@ -83,78 +83,3 @@ async def test_load_stats_on_coordinator_failover(manager: ManagerClient):
|
||||
coord3 = await get_topology_coordinator(manager)
|
||||
if coord3:
|
||||
break
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_load_stats_refresh_during_shutdown(manager: ManagerClient):
|
||||
"""Verify that _tablet_load_stats_refresh is properly joined during
|
||||
topology coordinator shutdown, even when a schema change notification
|
||||
triggers a refresh between run() completing and stop() being called.
|
||||
|
||||
Reproduces the scenario using two injection points:
|
||||
- topology_coordinator_pause_before_stop: pauses after run() finishes
|
||||
but before stop() is called
|
||||
- refresh_tablet_load_stats_pause: holds refresh_tablet_load_stats()
|
||||
so it's still in-flight during shutdown
|
||||
|
||||
Without the join in stop(), the refresh task outlives the coordinator
|
||||
and accesses freed memory.
|
||||
"""
|
||||
servers = await manager.servers_add(3)
|
||||
await manager.get_ready_cql(servers)
|
||||
|
||||
async with new_test_keyspace(manager,
|
||||
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
||||
coord = await get_topology_coordinator(manager)
|
||||
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
||||
coord_idx = host_ids.index(coord)
|
||||
coord_server = servers[coord_idx]
|
||||
|
||||
log = await manager.server_open_log(coord_server.server_id)
|
||||
mark = await log.mark()
|
||||
|
||||
# Injection B: pause between run() returning and stop() being called.
|
||||
await manager.api.enable_injection(
|
||||
coord_server.ip_addr, "topology_coordinator_pause_before_stop", one_shot=True)
|
||||
|
||||
# Stepdown causes the topology coordinator to abort and shut down.
|
||||
logger.info("Triggering stepdown on coordinator")
|
||||
await trigger_stepdown(manager, coord_server)
|
||||
|
||||
# Wait for injection B to fire. The coordinator has finished run() but
|
||||
# the schema change listener is still registered.
|
||||
mark, _ = await log.wait_for(
|
||||
"topology_coordinator_pause_before_stop: waiting", from_mark=mark)
|
||||
|
||||
# Injection A: block refresh_tablet_load_stats() before it accesses _shared_tm.
|
||||
# Enable it now so it only catches the notification-triggered call.
|
||||
await manager.api.enable_injection(
|
||||
coord_server.ip_addr, "refresh_tablet_load_stats_pause", one_shot=True)
|
||||
|
||||
# CREATE TABLE fires on_create_column_family on the old coordinator which
|
||||
# fire-and-forgets _tablet_load_stats_refresh.trigger() scheduling a task
|
||||
# via with_scheduling_group on the gossip scheduling group.
|
||||
logger.info("Issuing CREATE TABLE while coordinator is paused before stop()")
|
||||
async with new_test_table(manager, ks, "pk int PRIMARY KEY", reuse_tables=False):
|
||||
# Wait for injection A: refresh_tablet_load_stats() is now blocked before
|
||||
# accessing _shared_tm. The topology_coordinator is still alive (paused at B).
|
||||
await log.wait_for("refresh_tablet_load_stats_pause: waiting", from_mark=mark)
|
||||
|
||||
# Release injection B: coordinator proceeds through stop().
|
||||
# Without the fix, stop() returns quickly and run_topology_coordinator
|
||||
# frees the topology_coordinator frame. With the fix, stop() blocks at
|
||||
# _tablet_load_stats_refresh.join() until injection A is released.
|
||||
logger.info("Releasing injection B: coordinator will stop")
|
||||
await manager.api.message_injection(
|
||||
coord_server.ip_addr, "topology_coordinator_pause_before_stop")
|
||||
|
||||
# Release injection A: refresh_tablet_load_stats() resumes and accesses
|
||||
# this->_shared_tm via get_token_metadata_ptr(). Without the fix, 'this'
|
||||
# points to freed memory and ASan detects heap-use-after-free.
|
||||
logger.info("Releasing injection A: refresh resumes")
|
||||
await manager.api.message_injection(
|
||||
coord_server.ip_addr, "refresh_tablet_load_stats_pause")
|
||||
|
||||
# If the bug is present, the node crashed. read_barrier will fail.
|
||||
await read_barrier(manager.api, coord_server.ip_addr)
|
||||
|
||||
@@ -435,9 +435,8 @@ async def test_alter_tablets_rf_dc_drop(request: pytest.FixtureRequest, manager:
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
||||
async def get_replication_options(ks: str, host, ip_addr):
|
||||
await read_barrier(manager.api, ip_addr)
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
|
||||
async def get_replication_options(ks: str):
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
||||
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
||||
return repl
|
||||
|
||||
@@ -452,44 +451,43 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
|
||||
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
||||
|
||||
cql = manager.get_cql()
|
||||
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
|
||||
|
||||
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
||||
await cql.run_async("create table ks1.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks1")
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
await cql.run_async("create keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 2} and tablets = {'initial': 4};")
|
||||
await cql.run_async("create table ks2.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks2")
|
||||
assert repl['dc1'] == '1'
|
||||
assert repl['dc2'] == '2'
|
||||
|
||||
await cql.run_async("create keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
|
||||
await cql.run_async("create table ks3.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks3", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks3")
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
await cql.run_async("create keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
|
||||
await cql.run_async("create table ks4.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks4", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks4")
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
await cql.run_async(f"create keyspace ks5 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}} and tablets = {{'initial': 4}};")
|
||||
await cql.run_async("create table ks5.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks5", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks5")
|
||||
assert repl['dc1'] == '2'
|
||||
assert repl['dc2'] == '2'
|
||||
|
||||
await cql.run_async(f"create keyspace ks6 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
|
||||
await cql.run_async("create table ks6.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks6", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks6")
|
||||
assert repl['dc1'] == '2'
|
||||
|
||||
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
|
||||
|
||||
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks1")
|
||||
assert repl['dc1'] == ['rack1b']
|
||||
|
||||
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks1", "t")
|
||||
@@ -499,7 +497,7 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
|
||||
assert r.replicas[0][0] == host_ids[1]
|
||||
|
||||
await cql.run_async("alter keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a', 'rack2b']};")
|
||||
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks2")
|
||||
assert repl['dc1'] == ['rack1a']
|
||||
assert len(repl['dc2']) == 2
|
||||
assert 'rack2a' in repl['dc2'] and 'rack2b' in repl['dc2']
|
||||
@@ -525,13 +523,13 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
|
||||
pass
|
||||
|
||||
await cql.run_async("alter keyspace ks5 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b'], 'dc2' : 2};")
|
||||
repl = await get_replication_options("ks5", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks5")
|
||||
assert len(repl['dc1']) == 2
|
||||
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
|
||||
assert repl['dc2'] == '2'
|
||||
|
||||
await cql.run_async("alter keyspace ks6 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : 2, 'dc2' : ['rack2a']};")
|
||||
repl = await get_replication_options("ks6", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks6")
|
||||
assert repl['dc1'] == '2'
|
||||
assert len(repl['dc2']) == 1
|
||||
assert repl['dc2'][0] == 'rack2a'
|
||||
@@ -539,9 +537,8 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
||||
async def get_replication_options(ks: str, host, ip_addr):
|
||||
await read_barrier(manager.api, ip_addr)
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
|
||||
async def get_replication_options(ks: str):
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
||||
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
||||
return repl
|
||||
|
||||
@@ -554,11 +551,10 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
|
||||
await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc2', 'rack': 'rack2b'})]
|
||||
|
||||
cql = manager.get_cql()
|
||||
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
|
||||
|
||||
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
||||
await cql.run_async("create table ks1.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks1")
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
await cql.run_async("CREATE KEYSPACE ksv WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2} AND tablets = {'enabled': false}")
|
||||
@@ -578,19 +574,19 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
|
||||
servers = servers[0:-1]
|
||||
|
||||
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks1")
|
||||
assert repl['dc1'] == ['rack1b']
|
||||
|
||||
logging.info("Rolling restart")
|
||||
await manager.rolling_restart(servers, wait_for_cql=True, cmdline_options_override=["--enforce-rack-list", "true", "--error-injections-at-startup", "[]", "--smp", "2"])
|
||||
|
||||
await cql.run_async(f"create keyspace ks2 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
|
||||
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks2")
|
||||
assert len(repl['dc1']) == 2
|
||||
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
|
||||
|
||||
await cql.run_async(f"create keyspace ks3 with replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': 4}};")
|
||||
repl = await get_replication_options("ks3", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks3")
|
||||
assert len(repl['dc1']) == 1
|
||||
assert len(repl['dc2']) == 1
|
||||
assert 'rack1a' in repl['dc1'] or 'rack1b' in repl['dc1']
|
||||
@@ -606,7 +602,7 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
|
||||
assert failed
|
||||
|
||||
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b'], 'dc2': 1};")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks1")
|
||||
assert len(repl['dc1']) == 1
|
||||
assert repl['dc1'][0] == 'rack1b'
|
||||
assert len(repl['dc2']) == 1
|
||||
@@ -1113,9 +1109,8 @@ async def test_multi_rf_increase_before_decrease_0_N(request: pytest.FixtureRequ
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
||||
async def get_replication_options(ks: str, host, ip_addr):
|
||||
await read_barrier(manager.api, ip_addr)
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
|
||||
async def get_replication_options(ks: str):
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
||||
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
||||
return repl
|
||||
|
||||
@@ -1133,11 +1128,10 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
|
||||
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
||||
|
||||
cql = manager.get_cql()
|
||||
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
|
||||
|
||||
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
||||
await cql.run_async("create table ks1.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks1")
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
[await manager.api.disable_injection(s.ip_addr, numeric_injection) for s in servers]
|
||||
@@ -1171,7 +1165,7 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
|
||||
failed = True
|
||||
assert failed
|
||||
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
repl = await get_replication_options("ks1")
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
@@ -18,7 +18,6 @@
|
||||
#include <seastar/testing/test_case.hh>
|
||||
|
||||
#include "test/lib/exception_utils.hh"
|
||||
#include "test/lib/log.hh"
|
||||
#include "test/lib/test_utils.hh"
|
||||
#include "ldap_common.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
@@ -682,41 +681,3 @@ SEASTAR_TEST_CASE(ldap_config) {
|
||||
},
|
||||
make_ldap_config());
|
||||
}
|
||||
|
||||
// Reproduces the race between the cache pruner and the permission
|
||||
// loader lifecycle during shutdown. Refs SCYLLADB-1679.
|
||||
SEASTAR_TEST_CASE(ldap_pruner_no_crash_after_loader_cleared) {
|
||||
auto cfg = make_ldap_config();
|
||||
cfg->permissions_update_interval_in_ms.set(1);
|
||||
|
||||
auto call_count = seastar::make_lw_shared<int>(0);
|
||||
|
||||
co_await do_with_cql_env_thread([call_count](cql_test_env& env) {
|
||||
auto& cache = env.auth_cache().local();
|
||||
|
||||
testlog.info("Populating 50 cache entries");
|
||||
for (int i = 0; i < 50; i++) {
|
||||
auto r = auth::make_data_resource("system", fmt::format("t{}", i));
|
||||
cache.get_permissions(auth::role_or_anonymous(), r).get();
|
||||
}
|
||||
|
||||
testlog.info("Installing slow permission loader (10ms per call)");
|
||||
cache.set_permission_loader(
|
||||
[call_count] (const auth::role_or_anonymous&, const auth::resource&)
|
||||
-> seastar::future<auth::permission_set> {
|
||||
++(*call_count);
|
||||
co_await seastar::sleep(std::chrono::milliseconds(10));
|
||||
co_return auth::permission_set();
|
||||
});
|
||||
|
||||
testlog.info("Waiting for pruner to start reloading");
|
||||
while (*call_count == 0) {
|
||||
seastar::sleep(std::chrono::milliseconds(1)).get();
|
||||
}
|
||||
|
||||
testlog.info("Pruner started, letting teardown run");
|
||||
}, cfg);
|
||||
|
||||
testlog.info("Loader called {} times", *call_count);
|
||||
}
|
||||
|
||||
|
||||
@@ -126,9 +126,6 @@ class CppFile(pytest.File, ABC):
|
||||
return args
|
||||
|
||||
def collect(self) -> Iterator[CppTestCase]:
|
||||
if BUILD_MODE not in self.stash:
|
||||
return
|
||||
|
||||
custom_args = self.suite_config.get("custom_args", {}).get(self.test_name, DEFAULT_CUSTOM_ARGS)
|
||||
|
||||
for test_case in self.list_test_cases():
|
||||
|
||||
@@ -163,11 +163,6 @@ def scylla_binary(testpy_test) -> Path:
|
||||
|
||||
|
||||
def pytest_collection_modifyitems(items: list[pytest.Item]) -> None:
|
||||
items[:] = [
|
||||
item for item in items
|
||||
if (parent_file := item.getparent(cls=pytest.File)) is not None
|
||||
and BUILD_MODE in parent_file.stash
|
||||
]
|
||||
for item in items:
|
||||
modify_pytest_item(item=item)
|
||||
|
||||
@@ -290,10 +285,7 @@ def pytest_configure(config: pytest.Config) -> None:
|
||||
pytest_log_dir.mkdir(parents=True, exist_ok=True)
|
||||
if not _pytest_config.getoption("--save-log-on-success"):
|
||||
for file in pytest_log_dir.glob("*"):
|
||||
# This will help in case framework tests are executed with test.py event if it's the wrong way to run them.
|
||||
# test_no_bare_skip_markers_in_collection uses a subprocess to run a collection that has lead to race
|
||||
# condition, especially with repeat.
|
||||
file.unlink(missing_ok=True)
|
||||
file.unlink()
|
||||
|
||||
_pytest_config.stash[PYTEST_LOG_FILE] = f"{pytest_log_dir}/pytest_main_{HOST_ID}.log"
|
||||
|
||||
@@ -348,8 +340,7 @@ def pytest_collect_file(file_path: pathlib.Path,
|
||||
repeats = list(product(build_modes, parent.config.run_ids))
|
||||
|
||||
if not repeats:
|
||||
parent.stash[REPEATING_FILES].remove(file_path)
|
||||
return collectors
|
||||
return []
|
||||
|
||||
ihook = parent.ihook
|
||||
collectors = list(chain(collectors, chain.from_iterable(
|
||||
|
||||
@@ -75,7 +75,6 @@ def test_no_bare_skip_markers_in_collection():
|
||||
"--collect-only",
|
||||
"--ignore=boost", "--ignore=raft",
|
||||
"--ignore=ldap", "--ignore=vector_search",
|
||||
"--ignore=unit",
|
||||
"-p", "no:sugar"],
|
||||
capture_output=True, text=True,
|
||||
cwd=str(_TEST_ROOT),
|
||||
|
||||
Reference in New Issue
Block a user