From 4ce17d20dfd14f88eb15421fe74265e31c0bc24a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 5 Mar 2026 10:51:48 +0200 Subject: [PATCH 1/2] replica/database: consolidate the two database_apply error injections Into a single database_apply one. Add three parameters: * ks_name and cf_name to filter the tables to be affected * what - what to do: throw or wait This leads to smaller footprint in the code and improved filtering for table names at the cost of some extra error injection params in the tests. (cherry picked from commit f375aae257c90da19c35c24a4dd733ba6153304a) --- replica/database.cc | 16 +++++++-------- test/cluster/test_automatic_cleanup.py | 20 +++++++++---------- .../test_data_resurrection_in_memtable.py | 2 +- test/cluster/test_read_repair.py | 2 +- test/cluster/test_repair.py | 2 +- test/cluster/test_tablets2.py | 2 +- test/storage/test_out_of_space_prevention.py | 2 +- 7 files changed, 23 insertions(+), 23 deletions(-) diff --git a/replica/database.cc b/replica/database.cc index 2b3436f915..0f30ffbb72 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -2246,16 +2246,16 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra // assume failure until proven otherwise auto update_writes_failed = defer([&] { ++_stats->total_writes_failed; }); - utils::get_local_injector().inject("database_apply", [&s] () { - if (!is_system_keyspace(s->ks_name())) { - throw std::runtime_error("injected error"); + co_await utils::get_local_injector().inject("database_apply", [&s] (auto& handler) -> future<> { + if (s->ks_name() != handler.get("ks_name") || s->cf_name() != handler.get("cf_name")) { + co_return; } - }); - co_await utils::get_local_injector().inject("database_apply_wait", [&] (auto& handler) -> future<> { - if (s->cf_name() == handler.get("cf_name")) { - dblog.info("database_apply_wait: wait"); + if (handler.get("what") == "throw") { + throw std::runtime_error(format("injected error for {}.{}", s->ks_name(), s->cf_name())); + } else if (handler.get("what") == "wait") { + dblog.info("database_apply: wait"); co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5}); - dblog.info("database_apply_wait: done"); + dblog.info("database_apply: done"); } }); diff --git a/test/cluster/test_automatic_cleanup.py b/test/cluster/test_automatic_cleanup.py index 442db2300e..672fe8a38b 100644 --- a/test/cluster/test_automatic_cleanup.py +++ b/test/cluster/test_automatic_cleanup.py @@ -79,11 +79,11 @@ async def test_no_cleanup_when_unnecessary(manager: ManagerClient): async def test_cleanup_waits_for_stale_writes(manager: ManagerClient): """Scenario: * Start two nodes, a vnodes-based table with an rf=2 - * Run insert while bootstrapping another node, suspend this insert in database_apply_wait injection + * Run insert while bootstrapping another node, suspend this insert in database_apply injection * Bootstrap succeeds, capture the final topology version * Start decommission -> triggers global barrier, which we fail on another injection * This failure is not fatal, the cleanup procedure continues and blocks on waiting for the stale write - * We release the database_apply_wait injection, cleanup succeeds, write fails with 'stale topology exception' + * We release the database_apply injection, cleanup succeeds, write fails with 'stale topology exception' """ config = {'tablets_mode_for_new_keyspaces': 'disabled'} @@ -119,15 +119,15 @@ async def test_cleanup_waits_for_stale_writes(manager: ManagerClient): # Have a write request with write_both_read_new version stuck on both nodes: # - On the first node, this exercises the coordinator fencing code path. # - On the second node, this exercises the replica code path. - logger.info("Enable 'database_apply_wait' injection") + logger.info("Enable 'database_apply' injection") for s in servers[:-1]: - await manager.api.enable_injection(s.ip_addr, 'database_apply_wait', - False, parameters={'cf_name': 'my_test_table'}) + await manager.api.enable_injection(s.ip_addr, 'database_apply', + False, parameters={'ks_name': ks, 'cf_name': 'my_test_table', 'what': 'wait'}) logger.info("Start write") write_task = cql.run_async(f"INSERT INTO {ks}.my_test_table (pk, c) VALUES (1, 1)", host=hosts[0]) - logger.info("Waiting for database_apply_wait") - await log0.wait_for("database_apply_wait: wait") - await log1.wait_for("database_apply_wait: wait") + logger.info("Waiting for database_apply") + await log0.wait_for("database_apply: wait") + await log1.wait_for("database_apply: wait") # Finish bootstrapping the node logger.info("Trigger topology_coordinator/write_both_read_new/after_barrier") @@ -159,9 +159,9 @@ async def test_cleanup_waits_for_stale_writes(manager: ManagerClient): assert len(flush_matches) == 0 # Release the write -- the cleanup process should resume and the decommission succeed - await manager.api.message_injection(servers[0].ip_addr, "database_apply_wait") + await manager.api.message_injection(servers[0].ip_addr, "database_apply") await log0.wait_for("vnodes_cleanup: flush_all_tables", timeout=15) - await manager.api.message_injection(servers[1].ip_addr, "database_apply_wait") + await manager.api.message_injection(servers[1].ip_addr, "database_apply") await log1.wait_for("vnodes_cleanup: flush_all_tables", timeout=15) await decommission_task diff --git a/test/cluster/test_data_resurrection_in_memtable.py b/test/cluster/test_data_resurrection_in_memtable.py index 690fcc63f7..e97f44e20c 100644 --- a/test/cluster/test_data_resurrection_in_memtable.py +++ b/test/cluster/test_data_resurrection_in_memtable.py @@ -55,7 +55,7 @@ async def run_test_cache_tombstone_gc(manager: ManagerClient, statement_pairs: l for write_statement, delete_statement in statement_pairs: execute_with_tracing(cql, SimpleStatement(write_statement.format(ks=ks), consistency_level=ConsistencyLevel.ALL), log = True) - await manager.api.enable_injection(node3.ip_addr, "database_apply", one_shot=False) + await manager.api.enable_injection(node3.ip_addr, "database_apply", one_shot=False, parameters={"ks_name": ks, "cf_name": "tbl", "what": "throw"}) execute_with_tracing(cql, SimpleStatement(delete_statement.format(ks=ks), consistency_level=ConsistencyLevel.LOCAL_QUORUM), log = True) await manager.api.disable_injection(node3.ip_addr, "database_apply") diff --git a/test/cluster/test_read_repair.py b/test/cluster/test_read_repair.py index 0eb4c114fa..5a4a8755df 100644 --- a/test/cluster/test_read_repair.py +++ b/test/cluster/test_read_repair.py @@ -330,7 +330,7 @@ async def test_read_repair_with_trace_logging(request, manager): insert_stmt = cql.prepare(f"INSERT INTO {ks}.t (pk, ck, c) VALUES (?, ?, ?)") insert_stmt.consistency_level = ConsistencyLevel.ONE - await manager.api.enable_injection(node1.ip_addr, "database_apply", one_shot=False) + await manager.api.enable_injection(node1.ip_addr, "database_apply", one_shot=False, parameters={"ks_name": ks, "cf_name": "t", "what": "throw"}) for ck in range(0, 100): await cql.run_async(insert_stmt, (0, ck, ck)) await manager.api.disable_injection(node1.ip_addr, "database_apply") diff --git a/test/cluster/test_repair.py b/test/cluster/test_repair.py index c46dae4f1c..48422bfc27 100644 --- a/test/cluster/test_repair.py +++ b/test/cluster/test_repair.py @@ -313,7 +313,7 @@ async def test_repair_timtestamp_difference(manager): other_nodes = [n for n in nodes if n != node] for other_node in other_nodes: - await manager.api.enable_injection(other_node.ip_addr, "database_apply", False, {}) + await manager.api.enable_injection(other_node.ip_addr, "database_apply", False, parameters={"ks_name": "ks", "cf_name": "tbl", "what": "throw"}) await manager.driver_connect(node) diff --git a/test/cluster/test_tablets2.py b/test/cluster/test_tablets2.py index 69d902368a..dc482993c8 100644 --- a/test/cluster/test_tablets2.py +++ b/test/cluster/test_tablets2.py @@ -2156,7 +2156,7 @@ async def test_split_and_incremental_repair_synchronization(manager: ManagerClie insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, c) VALUES (?, ?)") insert_stmt.consistency_level = ConsistencyLevel.ONE - await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False) + await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False, parameters={"ks_name": ks, "cf_name": "test", "what": "throw"}) pks = range(256, 512) await asyncio.gather(*[cql.run_async(insert_stmt, (k, k)) for k in pks]) await manager.api.disable_injection(servers[0].ip_addr, "database_apply") diff --git a/test/storage/test_out_of_space_prevention.py b/test/storage/test_out_of_space_prevention.py index 9027a57ced..206dd65cb8 100644 --- a/test/storage/test_out_of_space_prevention.py +++ b/test/storage/test_out_of_space_prevention.py @@ -523,7 +523,7 @@ async def test_repair_failure_on_split_rejection(manager: ManagerClient, volumes insert_stmt = cql.prepare(f"INSERT INTO {cf} (pk, t) VALUES (?, ?)") insert_stmt.consistency_level = ConsistencyLevel.ONE - await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False) + await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False, parameters={"ks_name": ks, "cf_name": table, "what": "throw"}) pks = range(256, 512) await asyncio.gather(*[cql.run_async(insert_stmt, (k, f'{k}')) for k in pks]) await manager.api.disable_injection(servers[0].ip_addr, "database_apply") From 9a36e7f362317a588a1c0b8cd6bdeee8ba3b76e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 20 Apr 2026 16:35:15 +0300 Subject: [PATCH 2/2] test: fix flaky test_read_repair_with_trace_logging by reading tracing with CL=ALL Tracing events are written to system_traces.events with CL=ANY, so they are only guaranteed to be present on the local node of the query coordinator. Reading them back with the driver default (CL=LOCAL_ONE) may route the query to a replica that has not yet received all events, causing the assertion on 'digest mismatch, starting read repair' to fail intermittently. Fix execute_with_tracing() to read tracing via the ResponseFuture API with query_cl=ConsistencyLevel.ALL, so events from all replicas are merged before the caller inspects them. Fixes: SCYLLADB-1707 Closes scylladb/scylladb#29566 (cherry picked from commit b49cf6247f659658e8485a98a192547555c537f4) --- test/pylib/util.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/test/pylib/util.py b/test/pylib/util.py index f60f00699c..5d09adcc7e 100644 --- a/test/pylib/util.py +++ b/test/pylib/util.py @@ -393,7 +393,12 @@ def execute_with_tracing(cql : Session, statement : str | Statement, log : bool cql_execute_extra_kwargs['trace'] = True query_result = cql.execute(statement, *cql_execute_extra_args, **cql_execute_extra_kwargs) - tracing = query_result.get_all_query_traces(max_wait_sec_per=900) + # Tracing events are written to system_traces.events with CL=ANY, so they are + # only guaranteed to be present on the local node of the query coordinator. + # Reading them back with the driver default (CL=LOCAL_ONE) may hit a replica + # that has not yet received all events, causing intermittent failures. + # Using CL=ALL ensures events from all replicas are merged. + tracing = query_result.response_future.get_all_query_traces(max_wait_per=900, query_cl=ConsistencyLevel.ALL) ret = [] page_traces = []