mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-28 12:17:02 +00:00
Compare commits
2 Commits
next
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
93fbc0a683 | ||
|
|
520466b407 |
@@ -258,11 +258,13 @@ future<> ldap_role_manager::start() {
|
|||||||
} catch (const seastar::sleep_aborted&) {
|
} catch (const seastar::sleep_aborted&) {
|
||||||
co_return; // ignore
|
co_return; // ignore
|
||||||
}
|
}
|
||||||
try {
|
co_await _cache.container().invoke_on_all([] (cache& c) -> future<> {
|
||||||
co_await _cache.reload_all_permissions();
|
try {
|
||||||
} catch (...) {
|
co_await c.reload_all_permissions();
|
||||||
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
|
} catch (...) {
|
||||||
}
|
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
return _std_mgr.start();
|
return _std_mgr.start();
|
||||||
|
|||||||
@@ -157,20 +157,6 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
|
|||||||
return create_legacy_keyspace_if_missing(mm);
|
return create_legacy_keyspace_if_missing(mm);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
// Authorizer must be started before the permission loader is set,
|
|
||||||
// because the loader calls _authorizer->authorize().
|
|
||||||
// The loader must be set before starting the role manager, because
|
|
||||||
// LDAP role manager starts a pruner fiber that calls
|
|
||||||
// reload_all_permissions() which asserts _permission_loader is set.
|
|
||||||
co_await _authorizer->start();
|
|
||||||
if (!_used_by_maintenance_socket) {
|
|
||||||
// Maintenance socket mode can't cache permissions because it has
|
|
||||||
// different authorizer. We can't mix cached permissions, they could be
|
|
||||||
// different in normal mode.
|
|
||||||
_cache.set_permission_loader(std::bind(
|
|
||||||
&service::get_uncached_permissions,
|
|
||||||
this, std::placeholders::_1, std::placeholders::_2));
|
|
||||||
}
|
|
||||||
co_await _role_manager->start();
|
co_await _role_manager->start();
|
||||||
if (this_shard_id() == 0) {
|
if (this_shard_id() == 0) {
|
||||||
// Role manager and password authenticator have this odd startup
|
// Role manager and password authenticator have this odd startup
|
||||||
@@ -179,19 +165,21 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
|
|||||||
// creation therefore we need to wait here.
|
// creation therefore we need to wait here.
|
||||||
co_await _role_manager->ensure_superuser_is_created();
|
co_await _role_manager->ensure_superuser_is_created();
|
||||||
}
|
}
|
||||||
// Authenticator must be started after ensure_superuser_is_created()
|
co_await when_all_succeed(_authorizer->start(), _authenticator->start()).discard_result();
|
||||||
// because password_authenticator queries system.roles for the
|
if (!_used_by_maintenance_socket) {
|
||||||
// superuser entry created by the role manager.
|
// Maintenance socket mode can't cache permissions because it has
|
||||||
co_await _authenticator->start();
|
// different authorizer. We can't mix cached permissions, they could be
|
||||||
|
// different in normal mode.
|
||||||
|
_cache.set_permission_loader(std::bind(
|
||||||
|
&service::get_uncached_permissions,
|
||||||
|
this, std::placeholders::_1, std::placeholders::_2));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
future<> service::stop() {
|
future<> service::stop() {
|
||||||
_as.request_abort();
|
_as.request_abort();
|
||||||
// Reverse of start() order.
|
|
||||||
co_await _authenticator->stop();
|
|
||||||
co_await _role_manager->stop();
|
|
||||||
_cache.set_permission_loader(nullptr);
|
_cache.set_permission_loader(nullptr);
|
||||||
co_await _authorizer->stop();
|
return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop()).discard_result();
|
||||||
}
|
}
|
||||||
|
|
||||||
future<> service::ensure_superuser_is_created() {
|
future<> service::ensure_superuser_is_created() {
|
||||||
|
|||||||
@@ -71,7 +71,7 @@ used. If it is used, the statement will be a no-op if the materialized view alre
|
|||||||
MV Select Statement
|
MV Select Statement
|
||||||
...................
|
...................
|
||||||
|
|
||||||
The select statement of a materialized view creation defines which of the base table is included in the view. That
|
The select statement of a materialized view creation defines which of the base table columns are included in the view. That
|
||||||
statement is limited in a number of ways:
|
statement is limited in a number of ways:
|
||||||
|
|
||||||
- The :ref:`selection <selection-clause>` is limited to those that only select columns of the base table. In other
|
- The :ref:`selection <selection-clause>` is limited to those that only select columns of the base table. In other
|
||||||
|
|||||||
@@ -4237,7 +4237,6 @@ public:
|
|||||||
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
|
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
|
||||||
, _async_gate("topology_coordinator")
|
, _async_gate("topology_coordinator")
|
||||||
{
|
{
|
||||||
_lifecycle_notifier.register_subscriber(this);
|
|
||||||
_db.get_notifier().register_listener(this);
|
_db.get_notifier().register_listener(this);
|
||||||
// When the delay_cdc_stream_finalization error injection is disabled
|
// When the delay_cdc_stream_finalization error injection is disabled
|
||||||
// (test releases it), wake the topology coordinator so it retries
|
// (test releases it), wake the topology coordinator so it retries
|
||||||
@@ -4401,7 +4400,6 @@ future<bool> topology_coordinator::maybe_retry_failed_rf_change_tablet_rebuilds(
|
|||||||
}
|
}
|
||||||
|
|
||||||
future<> topology_coordinator::refresh_tablet_load_stats() {
|
future<> topology_coordinator::refresh_tablet_load_stats() {
|
||||||
co_await utils::get_local_injector().inject("refresh_tablet_load_stats_pause", utils::wait_for_message(5min));
|
|
||||||
auto tm = get_token_metadata_ptr();
|
auto tm = get_token_metadata_ptr();
|
||||||
|
|
||||||
locator::load_stats stats;
|
locator::load_stats stats;
|
||||||
@@ -4725,6 +4723,7 @@ future<> topology_coordinator::run() {
|
|||||||
|
|
||||||
co_await _async_gate.close();
|
co_await _async_gate.close();
|
||||||
co_await std::move(tablet_load_stats_refresher);
|
co_await std::move(tablet_load_stats_refresher);
|
||||||
|
co_await _tablet_load_stats_refresh.join();
|
||||||
co_await std::move(cdc_generation_publisher);
|
co_await std::move(cdc_generation_publisher);
|
||||||
co_await std::move(cdc_streams_gc);
|
co_await std::move(cdc_streams_gc);
|
||||||
co_await std::move(gossiper_orphan_remover);
|
co_await std::move(gossiper_orphan_remover);
|
||||||
@@ -4737,8 +4736,6 @@ future<> topology_coordinator::stop() {
|
|||||||
co_await _db.get_notifier().unregister_listener(this);
|
co_await _db.get_notifier().unregister_listener(this);
|
||||||
utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization");
|
utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization");
|
||||||
_topo_sm.on_tablet_split_ready = nullptr;
|
_topo_sm.on_tablet_split_ready = nullptr;
|
||||||
co_await _lifecycle_notifier.unregister_subscriber(this);
|
|
||||||
co_await _tablet_load_stats_refresh.join();
|
|
||||||
|
|
||||||
// if topology_coordinator::run() is aborted either because we are not a
|
// if topology_coordinator::run() is aborted either because we are not a
|
||||||
// leader anymore, or we are shutting down as a leader, we have to handle
|
// leader anymore, or we are shutting down as a leader, we have to handle
|
||||||
@@ -4800,6 +4797,7 @@ future<> run_topology_coordinator(
|
|||||||
topology_cmd_rpc_tracker};
|
topology_cmd_rpc_tracker};
|
||||||
|
|
||||||
std::exception_ptr ex;
|
std::exception_ptr ex;
|
||||||
|
lifecycle_notifier.register_subscriber(&coordinator);
|
||||||
try {
|
try {
|
||||||
rtlogger.info("start topology coordinator fiber");
|
rtlogger.info("start topology coordinator fiber");
|
||||||
co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
|
co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
|
||||||
@@ -4820,7 +4818,7 @@ future<> run_topology_coordinator(
|
|||||||
}
|
}
|
||||||
on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex));
|
on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex));
|
||||||
}
|
}
|
||||||
co_await utils::get_local_injector().inject("topology_coordinator_pause_before_stop", utils::wait_for_message(5min));
|
co_await lifecycle_notifier.unregister_subscriber(&coordinator);
|
||||||
co_await coordinator.stop();
|
co_await coordinator.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ from test.pylib.manager_client import ManagerClient
|
|||||||
from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair
|
from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair
|
||||||
from test.pylib.tablets import get_all_tablet_replicas
|
from test.pylib.tablets import get_all_tablet_replicas
|
||||||
from test.cluster.tasks.task_manager_client import TaskManagerClient
|
from test.cluster.tasks.task_manager_client import TaskManagerClient
|
||||||
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, ensure_group0_leader_on, new_test_keyspace, new_test_table, trigger_stepdown
|
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown
|
||||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||||
|
|
||||||
from cassandra.query import ConsistencyLevel, SimpleStatement
|
from cassandra.query import ConsistencyLevel, SimpleStatement
|
||||||
@@ -880,30 +880,41 @@ async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manage
|
|||||||
# affected replica but process the UNREPAIRED sstable on the others, so the classification
|
# affected replica but process the UNREPAIRED sstable on the others, so the classification
|
||||||
# divergence is never corrected. In tombstone scenarios this enables premature tombstone GC
|
# divergence is never corrected. In tombstone scenarios this enables premature tombstone GC
|
||||||
# on the affected replica leading to data resurrection.
|
# on the affected replica leading to data resurrection.
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||||
|
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
|
||||||
|
cmdline = ['--hinted-handoff-enabled', '0']
|
||||||
|
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
|
||||||
|
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
|
||||||
|
|
||||||
class _LeadershipTransferred(Exception):
|
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
|
||||||
"""Raised when leadership transferred to servers[1] during the test, requiring a retry."""
|
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
|
||||||
pass
|
await cql.run_async(
|
||||||
|
f"ALTER TABLE {ks}.test WITH compaction = "
|
||||||
|
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
|
||||||
|
)
|
||||||
|
|
||||||
async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, token, scylla_path, current_key):
|
# Disable autocompaction everywhere so we control exactly when compaction runs.
|
||||||
"""Core logic for test_incremental_repair_race_window_promotes_unrepaired_data.
|
for s in servers:
|
||||||
|
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
|
||||||
|
|
||||||
|
scylla_path = await manager.server_get_exe(servers[0].server_id)
|
||||||
|
|
||||||
|
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
|
||||||
|
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
|
||||||
|
# S0'(repaired_at=1) on all nodes.
|
||||||
|
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||||
|
|
||||||
|
# Insert keys 10-19 and flush on all nodes → S1(repaired_at=0).
|
||||||
|
# These will be the subject of repair 2.
|
||||||
|
repair2_keys = list(range(current_key, current_key + 10))
|
||||||
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
|
||||||
|
for s in servers:
|
||||||
|
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||||
|
current_key += 10
|
||||||
|
|
||||||
Returns the next current_key value.
|
|
||||||
Raises _LeadershipTransferred if servers[1] becomes coordinator after the
|
|
||||||
restart, signalling the caller to retry.
|
|
||||||
"""
|
|
||||||
# Ensure servers[1] is not the topology coordinator. If the coordinator is
|
|
||||||
# restarted, the Raft leader dies, a new election occurs, and the new
|
|
||||||
# coordinator re-initiates tablet repair -- flushing memtables on all replicas
|
|
||||||
# and marking post-repair data as repaired. That legitimate re-repair masks
|
|
||||||
# the compaction-merge bug this test detects.
|
|
||||||
coord = await get_topology_coordinator(manager)
|
coord = await get_topology_coordinator(manager)
|
||||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||||
if coord_serv == servers[1]:
|
|
||||||
other = next(s for s in servers if s != servers[1])
|
|
||||||
await ensure_group0_leader_on(manager, other)
|
|
||||||
coord = await get_topology_coordinator(manager)
|
|
||||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
|
||||||
coord_log = await manager.server_open_log(coord_serv.server_id)
|
coord_log = await manager.server_open_log(coord_serv.server_id)
|
||||||
coord_mark = await coord_log.mark()
|
coord_mark = await coord_log.mark()
|
||||||
|
|
||||||
@@ -967,16 +978,6 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
|
|||||||
await manager.server_start(target.server_id)
|
await manager.server_start(target.server_id)
|
||||||
await manager.servers_see_each_other(servers)
|
await manager.servers_see_each_other(servers)
|
||||||
|
|
||||||
# Check if leadership transferred to servers[1] during the restart.
|
|
||||||
# If so, the new coordinator will re-initiate repair, masking the bug.
|
|
||||||
new_coord = await get_topology_coordinator(manager)
|
|
||||||
new_coord_serv = await find_server_by_host_id(manager, servers, new_coord)
|
|
||||||
if new_coord_serv == servers[1]:
|
|
||||||
await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update")
|
|
||||||
await manager.api.wait_task(servers[0].ip_addr, task_id)
|
|
||||||
raise _LeadershipTransferred(
|
|
||||||
"servers[1] became topology coordinator after restart")
|
|
||||||
|
|
||||||
# Poll until compaction has produced F(repaired_at=2) containing post-repair keys,
|
# Poll until compaction has produced F(repaired_at=2) containing post-repair keys,
|
||||||
# confirming that the bug was triggered (S1' and E merged during the race window).
|
# confirming that the bug was triggered (S1' and E merged during the race window).
|
||||||
deadline = time.time() + 60
|
deadline = time.time() + 60
|
||||||
@@ -999,7 +1000,7 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
|
|||||||
if not compaction_ran:
|
if not compaction_ran:
|
||||||
logger.warning("Compaction did not merge S1' and E after restart during the race window; "
|
logger.warning("Compaction did not merge S1' and E after restart during the race window; "
|
||||||
"the bug was not triggered. Skipping assertion.")
|
"the bug was not triggered. Skipping assertion.")
|
||||||
return current_key
|
return
|
||||||
|
|
||||||
# Flush servers[0] and servers[2] AFTER the race window closes so their post-repair
|
# Flush servers[0] and servers[2] AFTER the race window closes so their post-repair
|
||||||
# keys land in G(repaired_at=0): correctly classified as UNREPAIRED.
|
# keys land in G(repaired_at=0): correctly classified as UNREPAIRED.
|
||||||
@@ -1030,9 +1031,8 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
|
|||||||
f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, "
|
f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, "
|
||||||
f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}")
|
f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}")
|
||||||
|
|
||||||
# servers[0] and servers[2] were never restarted and the coordinator stayed
|
# servers[0] and servers[2] flushed post-repair keys after the race window closed,
|
||||||
# alive throughout, so no re-repair could have flushed their memtables.
|
# so those keys are in G(repaired_at=0) → correctly UNREPAIRED.
|
||||||
# Post-repair keys must NOT appear in repaired sstables on these servers.
|
|
||||||
assert not (repaired_keys_0 & post_repair_key_set), \
|
assert not (repaired_keys_0 & post_repair_key_set), \
|
||||||
f"servers[0] should not have post-repair keys in repaired sstables, " \
|
f"servers[0] should not have post-repair keys in repaired sstables, " \
|
||||||
f"got: {repaired_keys_0 & post_repair_key_set}"
|
f"got: {repaired_keys_0 & post_repair_key_set}"
|
||||||
@@ -1053,54 +1053,6 @@ async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, to
|
|||||||
f"on servers[1] after restart lost the being_repaired markers during the race window. " \
|
f"on servers[1] after restart lost the being_repaired markers during the race window. " \
|
||||||
f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \
|
f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \
|
||||||
f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"
|
f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"
|
||||||
return current_key
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
||||||
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
|
|
||||||
cmdline = ['--hinted-handoff-enabled', '0']
|
|
||||||
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
|
|
||||||
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
|
|
||||||
|
|
||||||
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
|
|
||||||
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
|
|
||||||
await cql.run_async(
|
|
||||||
f"ALTER TABLE {ks}.test WITH compaction = "
|
|
||||||
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Disable autocompaction everywhere so we control exactly when compaction runs.
|
|
||||||
for s in servers:
|
|
||||||
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
|
|
||||||
|
|
||||||
scylla_path = await manager.server_get_exe(servers[0].server_id)
|
|
||||||
|
|
||||||
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
|
|
||||||
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
|
|
||||||
# S0'(repaired_at=1) on all nodes.
|
|
||||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
|
||||||
|
|
||||||
# Insert keys 10-19 and flush on all nodes -> S1(repaired_at=0).
|
|
||||||
# These will be the subject of repair 2.
|
|
||||||
repair2_keys = list(range(current_key, current_key + 10))
|
|
||||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
|
|
||||||
for s in servers:
|
|
||||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
|
||||||
current_key += 10
|
|
||||||
|
|
||||||
# If leadership transfers to servers[1] between our coordinator check and the
|
|
||||||
# restart, the coordinator change masks the bug. Detect and retry.
|
|
||||||
max_attempts = 5
|
|
||||||
for attempt in range(1, max_attempts + 1):
|
|
||||||
try:
|
|
||||||
current_key = await _do_race_window_promotes_unrepaired_data(
|
|
||||||
manager, servers, cql, ks, token, scylla_path, current_key)
|
|
||||||
return
|
|
||||||
except _LeadershipTransferred as e:
|
|
||||||
logger.warning(f"Attempt {attempt}/{max_attempts}: {e}. Retrying.")
|
|
||||||
|
|
||||||
pytest.fail(f"Leadership kept transferring to servers[1] after {max_attempts} attempts; "
|
|
||||||
"could not run the test without coordinator interference.")
|
|
||||||
|
|
||||||
# ----------------------------------------------------------------------------
|
# ----------------------------------------------------------------------------
|
||||||
# Tombstone GC safety tests
|
# Tombstone GC safety tests
|
||||||
|
|||||||
@@ -4,7 +4,7 @@
|
|||||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||||
#
|
#
|
||||||
from test.pylib.manager_client import ManagerClient
|
from test.pylib.manager_client import ManagerClient
|
||||||
from test.cluster.util import get_topology_coordinator, trigger_stepdown, new_test_keyspace, new_test_table
|
from test.cluster.util import get_topology_coordinator, trigger_stepdown
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import logging
|
import logging
|
||||||
@@ -83,78 +83,3 @@ async def test_load_stats_on_coordinator_failover(manager: ManagerClient):
|
|||||||
coord3 = await get_topology_coordinator(manager)
|
coord3 = await get_topology_coordinator(manager)
|
||||||
if coord3:
|
if coord3:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
||||||
async def test_load_stats_refresh_during_shutdown(manager: ManagerClient):
|
|
||||||
"""Verify that _tablet_load_stats_refresh is properly joined during
|
|
||||||
topology coordinator shutdown, even when a schema change notification
|
|
||||||
triggers a refresh between run() completing and stop() being called.
|
|
||||||
|
|
||||||
Reproduces the scenario using two injection points:
|
|
||||||
- topology_coordinator_pause_before_stop: pauses after run() finishes
|
|
||||||
but before stop() is called
|
|
||||||
- refresh_tablet_load_stats_pause: holds refresh_tablet_load_stats()
|
|
||||||
so it's still in-flight during shutdown
|
|
||||||
|
|
||||||
Without the join in stop(), the refresh task outlives the coordinator
|
|
||||||
and accesses freed memory.
|
|
||||||
"""
|
|
||||||
servers = await manager.servers_add(3)
|
|
||||||
await manager.get_ready_cql(servers)
|
|
||||||
|
|
||||||
async with new_test_keyspace(manager,
|
|
||||||
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
|
||||||
coord = await get_topology_coordinator(manager)
|
|
||||||
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
|
||||||
coord_idx = host_ids.index(coord)
|
|
||||||
coord_server = servers[coord_idx]
|
|
||||||
|
|
||||||
log = await manager.server_open_log(coord_server.server_id)
|
|
||||||
mark = await log.mark()
|
|
||||||
|
|
||||||
# Injection B: pause between run() returning and stop() being called.
|
|
||||||
await manager.api.enable_injection(
|
|
||||||
coord_server.ip_addr, "topology_coordinator_pause_before_stop", one_shot=True)
|
|
||||||
|
|
||||||
# Stepdown causes the topology coordinator to abort and shut down.
|
|
||||||
logger.info("Triggering stepdown on coordinator")
|
|
||||||
await trigger_stepdown(manager, coord_server)
|
|
||||||
|
|
||||||
# Wait for injection B to fire. The coordinator has finished run() but
|
|
||||||
# the schema change listener is still registered.
|
|
||||||
mark, _ = await log.wait_for(
|
|
||||||
"topology_coordinator_pause_before_stop: waiting", from_mark=mark)
|
|
||||||
|
|
||||||
# Injection A: block refresh_tablet_load_stats() before it accesses _shared_tm.
|
|
||||||
# Enable it now so it only catches the notification-triggered call.
|
|
||||||
await manager.api.enable_injection(
|
|
||||||
coord_server.ip_addr, "refresh_tablet_load_stats_pause", one_shot=True)
|
|
||||||
|
|
||||||
# CREATE TABLE fires on_create_column_family on the old coordinator which
|
|
||||||
# fire-and-forgets _tablet_load_stats_refresh.trigger() scheduling a task
|
|
||||||
# via with_scheduling_group on the gossip scheduling group.
|
|
||||||
logger.info("Issuing CREATE TABLE while coordinator is paused before stop()")
|
|
||||||
async with new_test_table(manager, ks, "pk int PRIMARY KEY", reuse_tables=False):
|
|
||||||
# Wait for injection A: refresh_tablet_load_stats() is now blocked before
|
|
||||||
# accessing _shared_tm. The topology_coordinator is still alive (paused at B).
|
|
||||||
await log.wait_for("refresh_tablet_load_stats_pause: waiting", from_mark=mark)
|
|
||||||
|
|
||||||
# Release injection B: coordinator proceeds through stop().
|
|
||||||
# Without the fix, stop() returns quickly and run_topology_coordinator
|
|
||||||
# frees the topology_coordinator frame. With the fix, stop() blocks at
|
|
||||||
# _tablet_load_stats_refresh.join() until injection A is released.
|
|
||||||
logger.info("Releasing injection B: coordinator will stop")
|
|
||||||
await manager.api.message_injection(
|
|
||||||
coord_server.ip_addr, "topology_coordinator_pause_before_stop")
|
|
||||||
|
|
||||||
# Release injection A: refresh_tablet_load_stats() resumes and accesses
|
|
||||||
# this->_shared_tm via get_token_metadata_ptr(). Without the fix, 'this'
|
|
||||||
# points to freed memory and ASan detects heap-use-after-free.
|
|
||||||
logger.info("Releasing injection A: refresh resumes")
|
|
||||||
await manager.api.message_injection(
|
|
||||||
coord_server.ip_addr, "refresh_tablet_load_stats_pause")
|
|
||||||
|
|
||||||
# If the bug is present, the node crashed. read_barrier will fail.
|
|
||||||
await read_barrier(manager.api, coord_server.ip_addr)
|
|
||||||
|
|||||||
@@ -18,7 +18,6 @@
|
|||||||
#include <seastar/testing/test_case.hh>
|
#include <seastar/testing/test_case.hh>
|
||||||
|
|
||||||
#include "test/lib/exception_utils.hh"
|
#include "test/lib/exception_utils.hh"
|
||||||
#include "test/lib/log.hh"
|
|
||||||
#include "test/lib/test_utils.hh"
|
#include "test/lib/test_utils.hh"
|
||||||
#include "ldap_common.hh"
|
#include "ldap_common.hh"
|
||||||
#include "service/migration_manager.hh"
|
#include "service/migration_manager.hh"
|
||||||
@@ -682,41 +681,3 @@ SEASTAR_TEST_CASE(ldap_config) {
|
|||||||
},
|
},
|
||||||
make_ldap_config());
|
make_ldap_config());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reproduces the race between the cache pruner and the permission
|
|
||||||
// loader lifecycle during shutdown. Refs SCYLLADB-1679.
|
|
||||||
SEASTAR_TEST_CASE(ldap_pruner_no_crash_after_loader_cleared) {
|
|
||||||
auto cfg = make_ldap_config();
|
|
||||||
cfg->permissions_update_interval_in_ms.set(1);
|
|
||||||
|
|
||||||
auto call_count = seastar::make_lw_shared<int>(0);
|
|
||||||
|
|
||||||
co_await do_with_cql_env_thread([call_count](cql_test_env& env) {
|
|
||||||
auto& cache = env.auth_cache().local();
|
|
||||||
|
|
||||||
testlog.info("Populating 50 cache entries");
|
|
||||||
for (int i = 0; i < 50; i++) {
|
|
||||||
auto r = auth::make_data_resource("system", fmt::format("t{}", i));
|
|
||||||
cache.get_permissions(auth::role_or_anonymous(), r).get();
|
|
||||||
}
|
|
||||||
|
|
||||||
testlog.info("Installing slow permission loader (10ms per call)");
|
|
||||||
cache.set_permission_loader(
|
|
||||||
[call_count] (const auth::role_or_anonymous&, const auth::resource&)
|
|
||||||
-> seastar::future<auth::permission_set> {
|
|
||||||
++(*call_count);
|
|
||||||
co_await seastar::sleep(std::chrono::milliseconds(10));
|
|
||||||
co_return auth::permission_set();
|
|
||||||
});
|
|
||||||
|
|
||||||
testlog.info("Waiting for pruner to start reloading");
|
|
||||||
while (*call_count == 0) {
|
|
||||||
seastar::sleep(std::chrono::milliseconds(1)).get();
|
|
||||||
}
|
|
||||||
|
|
||||||
testlog.info("Pruner started, letting teardown run");
|
|
||||||
}, cfg);
|
|
||||||
|
|
||||||
testlog.info("Loader called {} times", *call_count);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|||||||
@@ -126,9 +126,6 @@ class CppFile(pytest.File, ABC):
|
|||||||
return args
|
return args
|
||||||
|
|
||||||
def collect(self) -> Iterator[CppTestCase]:
|
def collect(self) -> Iterator[CppTestCase]:
|
||||||
if BUILD_MODE not in self.stash:
|
|
||||||
return
|
|
||||||
|
|
||||||
custom_args = self.suite_config.get("custom_args", {}).get(self.test_name, DEFAULT_CUSTOM_ARGS)
|
custom_args = self.suite_config.get("custom_args", {}).get(self.test_name, DEFAULT_CUSTOM_ARGS)
|
||||||
|
|
||||||
for test_case in self.list_test_cases():
|
for test_case in self.list_test_cases():
|
||||||
|
|||||||
@@ -163,11 +163,6 @@ def scylla_binary(testpy_test) -> Path:
|
|||||||
|
|
||||||
|
|
||||||
def pytest_collection_modifyitems(items: list[pytest.Item]) -> None:
|
def pytest_collection_modifyitems(items: list[pytest.Item]) -> None:
|
||||||
items[:] = [
|
|
||||||
item for item in items
|
|
||||||
if (parent_file := item.getparent(cls=pytest.File)) is not None
|
|
||||||
and BUILD_MODE in parent_file.stash
|
|
||||||
]
|
|
||||||
for item in items:
|
for item in items:
|
||||||
modify_pytest_item(item=item)
|
modify_pytest_item(item=item)
|
||||||
|
|
||||||
@@ -290,10 +285,7 @@ def pytest_configure(config: pytest.Config) -> None:
|
|||||||
pytest_log_dir.mkdir(parents=True, exist_ok=True)
|
pytest_log_dir.mkdir(parents=True, exist_ok=True)
|
||||||
if not _pytest_config.getoption("--save-log-on-success"):
|
if not _pytest_config.getoption("--save-log-on-success"):
|
||||||
for file in pytest_log_dir.glob("*"):
|
for file in pytest_log_dir.glob("*"):
|
||||||
# This will help in case framework tests are executed with test.py event if it's the wrong way to run them.
|
file.unlink()
|
||||||
# test_no_bare_skip_markers_in_collection uses a subprocess to run a collection that has lead to race
|
|
||||||
# condition, especially with repeat.
|
|
||||||
file.unlink(missing_ok=True)
|
|
||||||
|
|
||||||
_pytest_config.stash[PYTEST_LOG_FILE] = f"{pytest_log_dir}/pytest_main_{HOST_ID}.log"
|
_pytest_config.stash[PYTEST_LOG_FILE] = f"{pytest_log_dir}/pytest_main_{HOST_ID}.log"
|
||||||
|
|
||||||
@@ -348,8 +340,7 @@ def pytest_collect_file(file_path: pathlib.Path,
|
|||||||
repeats = list(product(build_modes, parent.config.run_ids))
|
repeats = list(product(build_modes, parent.config.run_ids))
|
||||||
|
|
||||||
if not repeats:
|
if not repeats:
|
||||||
parent.stash[REPEATING_FILES].remove(file_path)
|
return []
|
||||||
return collectors
|
|
||||||
|
|
||||||
ihook = parent.ihook
|
ihook = parent.ihook
|
||||||
collectors = list(chain(collectors, chain.from_iterable(
|
collectors = list(chain(collectors, chain.from_iterable(
|
||||||
|
|||||||
@@ -75,7 +75,6 @@ def test_no_bare_skip_markers_in_collection():
|
|||||||
"--collect-only",
|
"--collect-only",
|
||||||
"--ignore=boost", "--ignore=raft",
|
"--ignore=boost", "--ignore=raft",
|
||||||
"--ignore=ldap", "--ignore=vector_search",
|
"--ignore=ldap", "--ignore=vector_search",
|
||||||
"--ignore=unit",
|
|
||||||
"-p", "no:sugar"],
|
"-p", "no:sugar"],
|
||||||
capture_output=True, text=True,
|
capture_output=True, text=True,
|
||||||
cwd=str(_TEST_ROOT),
|
cwd=str(_TEST_ROOT),
|
||||||
|
|||||||
Reference in New Issue
Block a user