mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-28 20:27:03 +00:00
Compare commits
18 Commits
fix-invali
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c4de2b3c9d | ||
|
|
d9dd3bfe53 | ||
|
|
b0f988afc4 | ||
|
|
a7e9c0e6d2 | ||
|
|
3ea4af1c8c | ||
|
|
459e3970cd | ||
|
|
8756f7c068 | ||
|
|
2615d0e8d8 | ||
|
|
914b70c75b | ||
|
|
6b7ce5e244 | ||
|
|
9d3d424d58 | ||
|
|
f2f4915e09 | ||
|
|
92c09d106d | ||
|
|
8855e77465 | ||
|
|
adf1e26bab | ||
|
|
37a547604f | ||
|
|
c3e5285d45 | ||
|
|
f75e5ac65b |
@@ -258,13 +258,11 @@ future<> ldap_role_manager::start() {
|
||||
} catch (const seastar::sleep_aborted&) {
|
||||
co_return; // ignore
|
||||
}
|
||||
co_await _cache.container().invoke_on_all([] (cache& c) -> future<> {
|
||||
try {
|
||||
co_await c.reload_all_permissions();
|
||||
} catch (...) {
|
||||
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
|
||||
}
|
||||
});
|
||||
try {
|
||||
co_await _cache.reload_all_permissions();
|
||||
} catch (...) {
|
||||
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
return _std_mgr.start();
|
||||
|
||||
@@ -157,15 +157,12 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
|
||||
return create_legacy_keyspace_if_missing(mm);
|
||||
});
|
||||
}
|
||||
co_await _role_manager->start();
|
||||
if (this_shard_id() == 0) {
|
||||
// Role manager and password authenticator have this odd startup
|
||||
// mechanism where they asynchronously create the superuser role
|
||||
// in the background. Correct password creation depends on role
|
||||
// creation therefore we need to wait here.
|
||||
co_await _role_manager->ensure_superuser_is_created();
|
||||
}
|
||||
co_await when_all_succeed(_authorizer->start(), _authenticator->start()).discard_result();
|
||||
// Authorizer must be started before the permission loader is set,
|
||||
// because the loader calls _authorizer->authorize().
|
||||
// The loader must be set before starting the role manager, because
|
||||
// LDAP role manager starts a pruner fiber that calls
|
||||
// reload_all_permissions() which asserts _permission_loader is set.
|
||||
co_await _authorizer->start();
|
||||
if (!_used_by_maintenance_socket) {
|
||||
// Maintenance socket mode can't cache permissions because it has
|
||||
// different authorizer. We can't mix cached permissions, they could be
|
||||
@@ -174,12 +171,27 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
|
||||
&service::get_uncached_permissions,
|
||||
this, std::placeholders::_1, std::placeholders::_2));
|
||||
}
|
||||
co_await _role_manager->start();
|
||||
if (this_shard_id() == 0) {
|
||||
// Role manager and password authenticator have this odd startup
|
||||
// mechanism where they asynchronously create the superuser role
|
||||
// in the background. Correct password creation depends on role
|
||||
// creation therefore we need to wait here.
|
||||
co_await _role_manager->ensure_superuser_is_created();
|
||||
}
|
||||
// Authenticator must be started after ensure_superuser_is_created()
|
||||
// because password_authenticator queries system.roles for the
|
||||
// superuser entry created by the role manager.
|
||||
co_await _authenticator->start();
|
||||
}
|
||||
|
||||
future<> service::stop() {
|
||||
_as.request_abort();
|
||||
// Reverse of start() order.
|
||||
co_await _authenticator->stop();
|
||||
co_await _role_manager->stop();
|
||||
_cache.set_permission_loader(nullptr);
|
||||
return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop()).discard_result();
|
||||
co_await _authorizer->stop();
|
||||
}
|
||||
|
||||
future<> service::ensure_superuser_is_created() {
|
||||
|
||||
@@ -45,7 +45,7 @@ Example:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool removenode 675ed9f4-6564-6dbd-ca08-43fddce952de
|
||||
nodetool removenode 675ed9f4-6564-6dbd-can8-43fddce952gy
|
||||
|
||||
To only mark the node as permanently down without doing actual removal, use :doc:`nodetool excludenode </operating-scylla/nodetool-commands/excludenode>`:
|
||||
|
||||
@@ -79,6 +79,6 @@ Example:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool removenode --ignore-dead-nodes 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c,125ed9f4-7777-1db0-aac8-43fddce9123e 675ed9f4-6564-6dbd-ca08-43fddce952de
|
||||
nodetool removenode --ignore-dead-nodes 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c,125ed9f4-7777-1dbn-mac8-43fddce9123e 675ed9f4-6564-6dbd-can8-43fddce952gy
|
||||
|
||||
.. include:: nodetool-index.rst
|
||||
|
||||
@@ -74,7 +74,7 @@ Procedure
|
||||
-- Address Load Tokens Owns (effective) Host ID Rack
|
||||
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
|
||||
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
|
||||
UJ 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
|
||||
UJ 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
|
||||
|
||||
Nodes in the cluster finished streaming data to the new node:
|
||||
|
||||
@@ -86,7 +86,7 @@ Procedure
|
||||
-- Address Load Tokens Owns (effective) Host ID Rack
|
||||
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
|
||||
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
|
||||
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
|
||||
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
|
||||
|
||||
#. When the new node status is Up Normal (UN), run the :doc:`nodetool cleanup </operating-scylla/nodetool-commands/cleanup>` command on all nodes in the cluster except for the new node that has just been added. Cleanup removes keys that were streamed to the newly added node and are no longer owned by the node.
|
||||
|
||||
|
||||
@@ -192,7 +192,7 @@ Adding new nodes
|
||||
-- Address Load Tokens Owns Host ID Rack
|
||||
UN 192.168.1.10 500 MB 256 33.3% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c RACK0
|
||||
UN 192.168.1.11 500 MB 256 33.3% 125ed9f4-7777-1dbn-mac8-43fddce9123e RACK1
|
||||
UN 192.168.1.12 500 MB 256 33.3% 675ed9f4-6564-6dbd-ca08-43fddce952de RACK2
|
||||
UN 192.168.1.12 500 MB 256 33.3% 675ed9f4-6564-6dbd-can8-43fddce952gy RACK2
|
||||
UJ 192.168.2.10 250 MB 256 ? a1b2c3d4-5678-90ab-cdef-112233445566 RACK0
|
||||
|
||||
**Example output after bootstrap completes:**
|
||||
@@ -205,7 +205,7 @@ Adding new nodes
|
||||
-- Address Load Tokens Owns Host ID Rack
|
||||
UN 192.168.1.10 400 MB 256 25.0% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c RACK0
|
||||
UN 192.168.1.11 400 MB 256 25.0% 125ed9f4-7777-1dbn-mac8-43fddce9123e RACK1
|
||||
UN 192.168.1.12 400 MB 256 25.0% 675ed9f4-6564-6dbd-ca08-43fddce952de RACK2
|
||||
UN 192.168.1.12 400 MB 256 25.0% 675ed9f4-6564-6dbd-can8-43fddce952gy RACK2
|
||||
UN 192.168.2.10 400 MB 256 25.0% a1b2c3d4-5678-90ab-cdef-112233445566 RACK0
|
||||
|
||||
#. For tablets-enabled clusters, wait for tablet load balancing to complete.
|
||||
|
||||
@@ -163,5 +163,5 @@ This example shows how to install and configure a three-node cluster using Gossi
|
||||
-- Address Load Tokens Owns (effective) Host ID Rack
|
||||
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c 43
|
||||
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e 44
|
||||
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de 45
|
||||
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy 45
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ Prerequisites
|
||||
-- Address Load Tokens Owns (effective) Host ID Rack
|
||||
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
|
||||
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-lac8-23fddce9123e B1
|
||||
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
|
||||
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
|
||||
|
||||
Datacenter: ASIA-DC
|
||||
Status=Up/Down
|
||||
@@ -165,7 +165,7 @@ Procedure
|
||||
-- Address Load Tokens Owns (effective) Host ID Rack
|
||||
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
|
||||
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
|
||||
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
|
||||
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
|
||||
|
||||
Datacenter: EUROPE-DC
|
||||
Status=Up/Down
|
||||
|
||||
@@ -18,7 +18,7 @@ Removing a Running Node
|
||||
-- Address Load Tokens Owns (effective) Host ID Rack
|
||||
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
|
||||
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
|
||||
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
|
||||
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
|
||||
|
||||
#. If the node status is **Up Normal (UN)**, run the :doc:`nodetool decommission </operating-scylla/nodetool-commands/decommission>` command
|
||||
to remove the node you are connected to. Using ``nodetool decommission`` is the recommended method for cluster scale-down operations. It prevents data loss
|
||||
@@ -75,7 +75,7 @@ command providing the Host ID of the node you are removing. See :doc:`nodetool r
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool removenode 675ed9f4-6564-6dbd-ca08-43fddce952de
|
||||
nodetool removenode 675ed9f4-6564-6dbd-can8-43fddce952gy
|
||||
|
||||
The ``nodetool removenode`` command notifies other nodes that the token range it owns needs to be moved and
|
||||
the nodes should redistribute the data using streaming. Using the command does not guarantee the consistency of the rebalanced data if
|
||||
|
||||
@@ -23,7 +23,7 @@ Prerequisites
|
||||
-- Address Load Tokens Owns (effective) Host ID Rack
|
||||
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
|
||||
DN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
|
||||
DN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
|
||||
DN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
|
||||
|
||||
Login to one of the nodes in the cluster with (UN) status, collect the following info from the node:
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ Down (DN), and the node can be replaced.
|
||||
-- Address Load Tokens Owns (effective) Host ID Rack
|
||||
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
|
||||
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
|
||||
DN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
|
||||
DN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
|
||||
|
||||
Remove the Data
|
||||
==================
|
||||
@@ -72,7 +72,7 @@ Procedure
|
||||
|
||||
For example (using the Host ID of the failed node from above):
|
||||
|
||||
``replace_node_first_boot: 675ed9f4-6564-6dbd-ca08-43fddce952de``
|
||||
``replace_node_first_boot: 675ed9f4-6564-6dbd-can8-43fddce952gy``
|
||||
|
||||
#. Start the new node.
|
||||
|
||||
@@ -90,7 +90,7 @@ Procedure
|
||||
-- Address Load Tokens Owns (effective) Host ID Rack
|
||||
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
|
||||
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
|
||||
DN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
|
||||
DN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
|
||||
|
||||
``192.168.1.203`` is the dead node.
|
||||
|
||||
@@ -121,7 +121,7 @@ Procedure
|
||||
/192.168.1.203
|
||||
generation:1553759866
|
||||
heartbeat:2147483647
|
||||
HOST_ID:675ed9f4-6564-6dbd-ca08-43fddce952de
|
||||
HOST_ID:675ed9f4-6564-6dbd-can8-43fddce952gy
|
||||
STATUS:shutdown,true
|
||||
RELEASE_VERSION:3.0.8
|
||||
X3:3
|
||||
@@ -178,7 +178,7 @@ In this case, the node's data will be cleaned after restart. To remedy this, you
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
echo 'replace_node_first_boot: 675ed9f4-6564-6dbd-ca08-43fddce952de' | sudo tee --append /etc/scylla/scylla.yaml
|
||||
echo 'replace_node_first_boot: 675ed9f4-6564-6dbd-can8-43fddce952gy' | sudo tee --append /etc/scylla/scylla.yaml
|
||||
|
||||
#. Run the following command to re-setup RAID
|
||||
|
||||
|
||||
@@ -4237,6 +4237,7 @@ public:
|
||||
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
|
||||
, _async_gate("topology_coordinator")
|
||||
{
|
||||
_lifecycle_notifier.register_subscriber(this);
|
||||
_db.get_notifier().register_listener(this);
|
||||
// When the delay_cdc_stream_finalization error injection is disabled
|
||||
// (test releases it), wake the topology coordinator so it retries
|
||||
@@ -4400,6 +4401,7 @@ future<bool> topology_coordinator::maybe_retry_failed_rf_change_tablet_rebuilds(
|
||||
}
|
||||
|
||||
future<> topology_coordinator::refresh_tablet_load_stats() {
|
||||
co_await utils::get_local_injector().inject("refresh_tablet_load_stats_pause", utils::wait_for_message(5min));
|
||||
auto tm = get_token_metadata_ptr();
|
||||
|
||||
locator::load_stats stats;
|
||||
@@ -4723,7 +4725,6 @@ future<> topology_coordinator::run() {
|
||||
|
||||
co_await _async_gate.close();
|
||||
co_await std::move(tablet_load_stats_refresher);
|
||||
co_await _tablet_load_stats_refresh.join();
|
||||
co_await std::move(cdc_generation_publisher);
|
||||
co_await std::move(cdc_streams_gc);
|
||||
co_await std::move(gossiper_orphan_remover);
|
||||
@@ -4736,6 +4737,8 @@ future<> topology_coordinator::stop() {
|
||||
co_await _db.get_notifier().unregister_listener(this);
|
||||
utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization");
|
||||
_topo_sm.on_tablet_split_ready = nullptr;
|
||||
co_await _lifecycle_notifier.unregister_subscriber(this);
|
||||
co_await _tablet_load_stats_refresh.join();
|
||||
|
||||
// if topology_coordinator::run() is aborted either because we are not a
|
||||
// leader anymore, or we are shutting down as a leader, we have to handle
|
||||
@@ -4797,7 +4800,6 @@ future<> run_topology_coordinator(
|
||||
topology_cmd_rpc_tracker};
|
||||
|
||||
std::exception_ptr ex;
|
||||
lifecycle_notifier.register_subscriber(&coordinator);
|
||||
try {
|
||||
rtlogger.info("start topology coordinator fiber");
|
||||
co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
|
||||
@@ -4818,7 +4820,7 @@ future<> run_topology_coordinator(
|
||||
}
|
||||
on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex));
|
||||
}
|
||||
co_await lifecycle_notifier.unregister_subscriber(&coordinator);
|
||||
co_await utils::get_local_injector().inject("topology_coordinator_pause_before_stop", utils::wait_for_message(5min));
|
||||
co_await coordinator.stop();
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair
|
||||
from test.pylib.tablets import get_all_tablet_replicas
|
||||
from test.cluster.tasks.task_manager_client import TaskManagerClient
|
||||
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown
|
||||
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, ensure_group0_leader_on, new_test_keyspace, new_test_table, trigger_stepdown
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
|
||||
from cassandra.query import ConsistencyLevel, SimpleStatement
|
||||
@@ -880,41 +880,30 @@ async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manage
|
||||
# affected replica but process the UNREPAIRED sstable on the others, so the classification
|
||||
# divergence is never corrected. In tombstone scenarios this enables premature tombstone GC
|
||||
# on the affected replica leading to data resurrection.
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
|
||||
cmdline = ['--hinted-handoff-enabled', '0']
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
|
||||
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
|
||||
|
||||
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
|
||||
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
|
||||
await cql.run_async(
|
||||
f"ALTER TABLE {ks}.test WITH compaction = "
|
||||
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
|
||||
)
|
||||
class _LeadershipTransferred(Exception):
|
||||
"""Raised when leadership transferred to servers[1] during the test, requiring a retry."""
|
||||
pass
|
||||
|
||||
# Disable autocompaction everywhere so we control exactly when compaction runs.
|
||||
for s in servers:
|
||||
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
|
||||
|
||||
scylla_path = await manager.server_get_exe(servers[0].server_id)
|
||||
|
||||
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
|
||||
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
|
||||
# S0'(repaired_at=1) on all nodes.
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
|
||||
# Insert keys 10-19 and flush on all nodes → S1(repaired_at=0).
|
||||
# These will be the subject of repair 2.
|
||||
repair2_keys = list(range(current_key, current_key + 10))
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
|
||||
for s in servers:
|
||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||
current_key += 10
|
||||
async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, token, scylla_path, current_key):
|
||||
"""Core logic for test_incremental_repair_race_window_promotes_unrepaired_data.
|
||||
|
||||
Returns the next current_key value.
|
||||
Raises _LeadershipTransferred if servers[1] becomes coordinator after the
|
||||
restart, signalling the caller to retry.
|
||||
"""
|
||||
# Ensure servers[1] is not the topology coordinator. If the coordinator is
|
||||
# restarted, the Raft leader dies, a new election occurs, and the new
|
||||
# coordinator re-initiates tablet repair -- flushing memtables on all replicas
|
||||
# and marking post-repair data as repaired. That legitimate re-repair masks
|
||||
# the compaction-merge bug this test detects.
|
||||
coord = await get_topology_coordinator(manager)
|
||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||
if coord_serv == servers[1]:
|
||||
other = next(s for s in servers if s != servers[1])
|
||||
await ensure_group0_leader_on(manager, other)
|
||||
coord = await get_topology_coordinator(manager)
|
||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||
coord_log = await manager.server_open_log(coord_serv.server_id)
|
||||
coord_mark = await coord_log.mark()
|
||||
|
||||
@@ -978,6 +967,16 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
await manager.server_start(target.server_id)
|
||||
await manager.servers_see_each_other(servers)
|
||||
|
||||
# Check if leadership transferred to servers[1] during the restart.
|
||||
# If so, the new coordinator will re-initiate repair, masking the bug.
|
||||
new_coord = await get_topology_coordinator(manager)
|
||||
new_coord_serv = await find_server_by_host_id(manager, servers, new_coord)
|
||||
if new_coord_serv == servers[1]:
|
||||
await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update")
|
||||
await manager.api.wait_task(servers[0].ip_addr, task_id)
|
||||
raise _LeadershipTransferred(
|
||||
"servers[1] became topology coordinator after restart")
|
||||
|
||||
# Poll until compaction has produced F(repaired_at=2) containing post-repair keys,
|
||||
# confirming that the bug was triggered (S1' and E merged during the race window).
|
||||
deadline = time.time() + 60
|
||||
@@ -1000,7 +999,7 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
if not compaction_ran:
|
||||
logger.warning("Compaction did not merge S1' and E after restart during the race window; "
|
||||
"the bug was not triggered. Skipping assertion.")
|
||||
return
|
||||
return current_key
|
||||
|
||||
# Flush servers[0] and servers[2] AFTER the race window closes so their post-repair
|
||||
# keys land in G(repaired_at=0): correctly classified as UNREPAIRED.
|
||||
@@ -1031,8 +1030,9 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, "
|
||||
f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}")
|
||||
|
||||
# servers[0] and servers[2] flushed post-repair keys after the race window closed,
|
||||
# so those keys are in G(repaired_at=0) → correctly UNREPAIRED.
|
||||
# servers[0] and servers[2] were never restarted and the coordinator stayed
|
||||
# alive throughout, so no re-repair could have flushed their memtables.
|
||||
# Post-repair keys must NOT appear in repaired sstables on these servers.
|
||||
assert not (repaired_keys_0 & post_repair_key_set), \
|
||||
f"servers[0] should not have post-repair keys in repaired sstables, " \
|
||||
f"got: {repaired_keys_0 & post_repair_key_set}"
|
||||
@@ -1053,6 +1053,54 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
f"on servers[1] after restart lost the being_repaired markers during the race window. " \
|
||||
f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \
|
||||
f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"
|
||||
return current_key
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
|
||||
cmdline = ['--hinted-handoff-enabled', '0']
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
|
||||
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
|
||||
|
||||
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
|
||||
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
|
||||
await cql.run_async(
|
||||
f"ALTER TABLE {ks}.test WITH compaction = "
|
||||
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
|
||||
)
|
||||
|
||||
# Disable autocompaction everywhere so we control exactly when compaction runs.
|
||||
for s in servers:
|
||||
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
|
||||
|
||||
scylla_path = await manager.server_get_exe(servers[0].server_id)
|
||||
|
||||
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
|
||||
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
|
||||
# S0'(repaired_at=1) on all nodes.
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
|
||||
# Insert keys 10-19 and flush on all nodes -> S1(repaired_at=0).
|
||||
# These will be the subject of repair 2.
|
||||
repair2_keys = list(range(current_key, current_key + 10))
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
|
||||
for s in servers:
|
||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||
current_key += 10
|
||||
|
||||
# If leadership transfers to servers[1] between our coordinator check and the
|
||||
# restart, the coordinator change masks the bug. Detect and retry.
|
||||
max_attempts = 5
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
try:
|
||||
current_key = await _do_race_window_promotes_unrepaired_data(
|
||||
manager, servers, cql, ks, token, scylla_path, current_key)
|
||||
return
|
||||
except _LeadershipTransferred as e:
|
||||
logger.warning(f"Attempt {attempt}/{max_attempts}: {e}. Retrying.")
|
||||
|
||||
pytest.fail(f"Leadership kept transferring to servers[1] after {max_attempts} attempts; "
|
||||
"could not run the test without coordinator interference.")
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
# Tombstone GC safety tests
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
#
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.cluster.util import get_topology_coordinator, trigger_stepdown
|
||||
from test.cluster.util import get_topology_coordinator, trigger_stepdown, new_test_keyspace, new_test_table
|
||||
|
||||
import pytest
|
||||
import logging
|
||||
@@ -83,3 +83,78 @@ async def test_load_stats_on_coordinator_failover(manager: ManagerClient):
|
||||
coord3 = await get_topology_coordinator(manager)
|
||||
if coord3:
|
||||
break
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_load_stats_refresh_during_shutdown(manager: ManagerClient):
|
||||
"""Verify that _tablet_load_stats_refresh is properly joined during
|
||||
topology coordinator shutdown, even when a schema change notification
|
||||
triggers a refresh between run() completing and stop() being called.
|
||||
|
||||
Reproduces the scenario using two injection points:
|
||||
- topology_coordinator_pause_before_stop: pauses after run() finishes
|
||||
but before stop() is called
|
||||
- refresh_tablet_load_stats_pause: holds refresh_tablet_load_stats()
|
||||
so it's still in-flight during shutdown
|
||||
|
||||
Without the join in stop(), the refresh task outlives the coordinator
|
||||
and accesses freed memory.
|
||||
"""
|
||||
servers = await manager.servers_add(3)
|
||||
await manager.get_ready_cql(servers)
|
||||
|
||||
async with new_test_keyspace(manager,
|
||||
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
||||
coord = await get_topology_coordinator(manager)
|
||||
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
||||
coord_idx = host_ids.index(coord)
|
||||
coord_server = servers[coord_idx]
|
||||
|
||||
log = await manager.server_open_log(coord_server.server_id)
|
||||
mark = await log.mark()
|
||||
|
||||
# Injection B: pause between run() returning and stop() being called.
|
||||
await manager.api.enable_injection(
|
||||
coord_server.ip_addr, "topology_coordinator_pause_before_stop", one_shot=True)
|
||||
|
||||
# Stepdown causes the topology coordinator to abort and shut down.
|
||||
logger.info("Triggering stepdown on coordinator")
|
||||
await trigger_stepdown(manager, coord_server)
|
||||
|
||||
# Wait for injection B to fire. The coordinator has finished run() but
|
||||
# the schema change listener is still registered.
|
||||
mark, _ = await log.wait_for(
|
||||
"topology_coordinator_pause_before_stop: waiting", from_mark=mark)
|
||||
|
||||
# Injection A: block refresh_tablet_load_stats() before it accesses _shared_tm.
|
||||
# Enable it now so it only catches the notification-triggered call.
|
||||
await manager.api.enable_injection(
|
||||
coord_server.ip_addr, "refresh_tablet_load_stats_pause", one_shot=True)
|
||||
|
||||
# CREATE TABLE fires on_create_column_family on the old coordinator which
|
||||
# fire-and-forgets _tablet_load_stats_refresh.trigger() scheduling a task
|
||||
# via with_scheduling_group on the gossip scheduling group.
|
||||
logger.info("Issuing CREATE TABLE while coordinator is paused before stop()")
|
||||
async with new_test_table(manager, ks, "pk int PRIMARY KEY", reuse_tables=False):
|
||||
# Wait for injection A: refresh_tablet_load_stats() is now blocked before
|
||||
# accessing _shared_tm. The topology_coordinator is still alive (paused at B).
|
||||
await log.wait_for("refresh_tablet_load_stats_pause: waiting", from_mark=mark)
|
||||
|
||||
# Release injection B: coordinator proceeds through stop().
|
||||
# Without the fix, stop() returns quickly and run_topology_coordinator
|
||||
# frees the topology_coordinator frame. With the fix, stop() blocks at
|
||||
# _tablet_load_stats_refresh.join() until injection A is released.
|
||||
logger.info("Releasing injection B: coordinator will stop")
|
||||
await manager.api.message_injection(
|
||||
coord_server.ip_addr, "topology_coordinator_pause_before_stop")
|
||||
|
||||
# Release injection A: refresh_tablet_load_stats() resumes and accesses
|
||||
# this->_shared_tm via get_token_metadata_ptr(). Without the fix, 'this'
|
||||
# points to freed memory and ASan detects heap-use-after-free.
|
||||
logger.info("Releasing injection A: refresh resumes")
|
||||
await manager.api.message_injection(
|
||||
coord_server.ip_addr, "refresh_tablet_load_stats_pause")
|
||||
|
||||
# If the bug is present, the node crashed. read_barrier will fail.
|
||||
await read_barrier(manager.api, coord_server.ip_addr)
|
||||
|
||||
@@ -435,8 +435,9 @@ async def test_alter_tablets_rf_dc_drop(request: pytest.FixtureRequest, manager:
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
||||
async def get_replication_options(ks: str):
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
||||
async def get_replication_options(ks: str, host, ip_addr):
|
||||
await read_barrier(manager.api, ip_addr)
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
|
||||
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
||||
return repl
|
||||
|
||||
@@ -451,43 +452,44 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
|
||||
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
||||
|
||||
cql = manager.get_cql()
|
||||
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
|
||||
|
||||
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
||||
await cql.run_async("create table ks1.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks1")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
await cql.run_async("create keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 2} and tablets = {'initial': 4};")
|
||||
await cql.run_async("create table ks2.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks2")
|
||||
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == '1'
|
||||
assert repl['dc2'] == '2'
|
||||
|
||||
await cql.run_async("create keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
|
||||
await cql.run_async("create table ks3.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks3")
|
||||
repl = await get_replication_options("ks3", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
await cql.run_async("create keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
|
||||
await cql.run_async("create table ks4.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks4")
|
||||
repl = await get_replication_options("ks4", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
await cql.run_async(f"create keyspace ks5 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}} and tablets = {{'initial': 4}};")
|
||||
await cql.run_async("create table ks5.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks5")
|
||||
repl = await get_replication_options("ks5", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == '2'
|
||||
assert repl['dc2'] == '2'
|
||||
|
||||
await cql.run_async(f"create keyspace ks6 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
|
||||
await cql.run_async("create table ks6.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks6")
|
||||
repl = await get_replication_options("ks6", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == '2'
|
||||
|
||||
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
|
||||
|
||||
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
|
||||
repl = await get_replication_options("ks1")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == ['rack1b']
|
||||
|
||||
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks1", "t")
|
||||
@@ -497,7 +499,7 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
|
||||
assert r.replicas[0][0] == host_ids[1]
|
||||
|
||||
await cql.run_async("alter keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a', 'rack2b']};")
|
||||
repl = await get_replication_options("ks2")
|
||||
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == ['rack1a']
|
||||
assert len(repl['dc2']) == 2
|
||||
assert 'rack2a' in repl['dc2'] and 'rack2b' in repl['dc2']
|
||||
@@ -523,13 +525,13 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
|
||||
pass
|
||||
|
||||
await cql.run_async("alter keyspace ks5 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b'], 'dc2' : 2};")
|
||||
repl = await get_replication_options("ks5")
|
||||
repl = await get_replication_options("ks5", host, servers[0].ip_addr)
|
||||
assert len(repl['dc1']) == 2
|
||||
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
|
||||
assert repl['dc2'] == '2'
|
||||
|
||||
await cql.run_async("alter keyspace ks6 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : 2, 'dc2' : ['rack2a']};")
|
||||
repl = await get_replication_options("ks6")
|
||||
repl = await get_replication_options("ks6", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == '2'
|
||||
assert len(repl['dc2']) == 1
|
||||
assert repl['dc2'][0] == 'rack2a'
|
||||
@@ -537,8 +539,9 @@ async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
||||
async def get_replication_options(ks: str):
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
||||
async def get_replication_options(ks: str, host, ip_addr):
|
||||
await read_barrier(manager.api, ip_addr)
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
|
||||
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
||||
return repl
|
||||
|
||||
@@ -551,10 +554,11 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
|
||||
await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc2', 'rack': 'rack2b'})]
|
||||
|
||||
cql = manager.get_cql()
|
||||
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
|
||||
|
||||
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
||||
await cql.run_async("create table ks1.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks1")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
await cql.run_async("CREATE KEYSPACE ksv WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2} AND tablets = {'enabled': false}")
|
||||
@@ -574,19 +578,19 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
|
||||
servers = servers[0:-1]
|
||||
|
||||
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
|
||||
repl = await get_replication_options("ks1")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == ['rack1b']
|
||||
|
||||
logging.info("Rolling restart")
|
||||
await manager.rolling_restart(servers, wait_for_cql=True, cmdline_options_override=["--enforce-rack-list", "true", "--error-injections-at-startup", "[]", "--smp", "2"])
|
||||
|
||||
await cql.run_async(f"create keyspace ks2 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
|
||||
repl = await get_replication_options("ks2")
|
||||
repl = await get_replication_options("ks2", host, servers[0].ip_addr)
|
||||
assert len(repl['dc1']) == 2
|
||||
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
|
||||
|
||||
await cql.run_async(f"create keyspace ks3 with replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': 4}};")
|
||||
repl = await get_replication_options("ks3")
|
||||
repl = await get_replication_options("ks3", host, servers[0].ip_addr)
|
||||
assert len(repl['dc1']) == 1
|
||||
assert len(repl['dc2']) == 1
|
||||
assert 'rack1a' in repl['dc1'] or 'rack1b' in repl['dc1']
|
||||
@@ -602,7 +606,7 @@ async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager:
|
||||
assert failed
|
||||
|
||||
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b'], 'dc2': 1};")
|
||||
repl = await get_replication_options("ks1")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
assert len(repl['dc1']) == 1
|
||||
assert repl['dc1'][0] == 'rack1b'
|
||||
assert len(repl['dc2']) == 1
|
||||
@@ -1109,8 +1113,9 @@ async def test_multi_rf_increase_before_decrease_0_N(request: pytest.FixtureRequ
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
||||
async def get_replication_options(ks: str):
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
||||
async def get_replication_options(ks: str, host, ip_addr):
|
||||
await read_barrier(manager.api, ip_addr)
|
||||
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'", host=host)
|
||||
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
||||
return repl
|
||||
|
||||
@@ -1128,10 +1133,11 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
|
||||
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
||||
|
||||
cql = manager.get_cql()
|
||||
host = (await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 30))[0]
|
||||
|
||||
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
||||
await cql.run_async("create table ks1.t (pk int primary key);")
|
||||
repl = await get_replication_options("ks1")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
[await manager.api.disable_injection(s.ip_addr, numeric_injection) for s in servers]
|
||||
@@ -1165,7 +1171,7 @@ async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureR
|
||||
failed = True
|
||||
assert failed
|
||||
|
||||
repl = await get_replication_options("ks1")
|
||||
repl = await get_replication_options("ks1", host, servers[0].ip_addr)
|
||||
assert repl['dc1'] == '1'
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
#include <seastar/testing/test_case.hh>
|
||||
|
||||
#include "test/lib/exception_utils.hh"
|
||||
#include "test/lib/log.hh"
|
||||
#include "test/lib/test_utils.hh"
|
||||
#include "ldap_common.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
@@ -681,3 +682,41 @@ SEASTAR_TEST_CASE(ldap_config) {
|
||||
},
|
||||
make_ldap_config());
|
||||
}
|
||||
|
||||
// Reproduces the race between the cache pruner and the permission
|
||||
// loader lifecycle during shutdown. Refs SCYLLADB-1679.
|
||||
SEASTAR_TEST_CASE(ldap_pruner_no_crash_after_loader_cleared) {
|
||||
auto cfg = make_ldap_config();
|
||||
cfg->permissions_update_interval_in_ms.set(1);
|
||||
|
||||
auto call_count = seastar::make_lw_shared<int>(0);
|
||||
|
||||
co_await do_with_cql_env_thread([call_count](cql_test_env& env) {
|
||||
auto& cache = env.auth_cache().local();
|
||||
|
||||
testlog.info("Populating 50 cache entries");
|
||||
for (int i = 0; i < 50; i++) {
|
||||
auto r = auth::make_data_resource("system", fmt::format("t{}", i));
|
||||
cache.get_permissions(auth::role_or_anonymous(), r).get();
|
||||
}
|
||||
|
||||
testlog.info("Installing slow permission loader (10ms per call)");
|
||||
cache.set_permission_loader(
|
||||
[call_count] (const auth::role_or_anonymous&, const auth::resource&)
|
||||
-> seastar::future<auth::permission_set> {
|
||||
++(*call_count);
|
||||
co_await seastar::sleep(std::chrono::milliseconds(10));
|
||||
co_return auth::permission_set();
|
||||
});
|
||||
|
||||
testlog.info("Waiting for pruner to start reloading");
|
||||
while (*call_count == 0) {
|
||||
seastar::sleep(std::chrono::milliseconds(1)).get();
|
||||
}
|
||||
|
||||
testlog.info("Pruner started, letting teardown run");
|
||||
}, cfg);
|
||||
|
||||
testlog.info("Loader called {} times", *call_count);
|
||||
}
|
||||
|
||||
|
||||
@@ -126,6 +126,9 @@ class CppFile(pytest.File, ABC):
|
||||
return args
|
||||
|
||||
def collect(self) -> Iterator[CppTestCase]:
|
||||
if BUILD_MODE not in self.stash:
|
||||
return
|
||||
|
||||
custom_args = self.suite_config.get("custom_args", {}).get(self.test_name, DEFAULT_CUSTOM_ARGS)
|
||||
|
||||
for test_case in self.list_test_cases():
|
||||
|
||||
@@ -163,6 +163,11 @@ def scylla_binary(testpy_test) -> Path:
|
||||
|
||||
|
||||
def pytest_collection_modifyitems(items: list[pytest.Item]) -> None:
|
||||
items[:] = [
|
||||
item for item in items
|
||||
if (parent_file := item.getparent(cls=pytest.File)) is not None
|
||||
and BUILD_MODE in parent_file.stash
|
||||
]
|
||||
for item in items:
|
||||
modify_pytest_item(item=item)
|
||||
|
||||
@@ -285,7 +290,10 @@ def pytest_configure(config: pytest.Config) -> None:
|
||||
pytest_log_dir.mkdir(parents=True, exist_ok=True)
|
||||
if not _pytest_config.getoption("--save-log-on-success"):
|
||||
for file in pytest_log_dir.glob("*"):
|
||||
file.unlink()
|
||||
# This will help in case framework tests are executed with test.py event if it's the wrong way to run them.
|
||||
# test_no_bare_skip_markers_in_collection uses a subprocess to run a collection that has lead to race
|
||||
# condition, especially with repeat.
|
||||
file.unlink(missing_ok=True)
|
||||
|
||||
_pytest_config.stash[PYTEST_LOG_FILE] = f"{pytest_log_dir}/pytest_main_{HOST_ID}.log"
|
||||
|
||||
@@ -340,7 +348,8 @@ def pytest_collect_file(file_path: pathlib.Path,
|
||||
repeats = list(product(build_modes, parent.config.run_ids))
|
||||
|
||||
if not repeats:
|
||||
return []
|
||||
parent.stash[REPEATING_FILES].remove(file_path)
|
||||
return collectors
|
||||
|
||||
ihook = parent.ihook
|
||||
collectors = list(chain(collectors, chain.from_iterable(
|
||||
|
||||
@@ -75,6 +75,7 @@ def test_no_bare_skip_markers_in_collection():
|
||||
"--collect-only",
|
||||
"--ignore=boost", "--ignore=raft",
|
||||
"--ignore=ldap", "--ignore=vector_search",
|
||||
"--ignore=unit",
|
||||
"-p", "no:sugar"],
|
||||
capture_output=True, text=True,
|
||||
cwd=str(_TEST_ROOT),
|
||||
|
||||
Reference in New Issue
Block a user