diff --git a/replica/database.cc b/replica/database.cc index 2b3436f915..0f30ffbb72 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -2246,16 +2246,16 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra // assume failure until proven otherwise auto update_writes_failed = defer([&] { ++_stats->total_writes_failed; }); - utils::get_local_injector().inject("database_apply", [&s] () { - if (!is_system_keyspace(s->ks_name())) { - throw std::runtime_error("injected error"); + co_await utils::get_local_injector().inject("database_apply", [&s] (auto& handler) -> future<> { + if (s->ks_name() != handler.get("ks_name") || s->cf_name() != handler.get("cf_name")) { + co_return; } - }); - co_await utils::get_local_injector().inject("database_apply_wait", [&] (auto& handler) -> future<> { - if (s->cf_name() == handler.get("cf_name")) { - dblog.info("database_apply_wait: wait"); + if (handler.get("what") == "throw") { + throw std::runtime_error(format("injected error for {}.{}", s->ks_name(), s->cf_name())); + } else if (handler.get("what") == "wait") { + dblog.info("database_apply: wait"); co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5}); - dblog.info("database_apply_wait: done"); + dblog.info("database_apply: done"); } }); diff --git a/test/cluster/test_automatic_cleanup.py b/test/cluster/test_automatic_cleanup.py index 442db2300e..672fe8a38b 100644 --- a/test/cluster/test_automatic_cleanup.py +++ b/test/cluster/test_automatic_cleanup.py @@ -79,11 +79,11 @@ async def test_no_cleanup_when_unnecessary(manager: ManagerClient): async def test_cleanup_waits_for_stale_writes(manager: ManagerClient): """Scenario: * Start two nodes, a vnodes-based table with an rf=2 - * Run insert while bootstrapping another node, suspend this insert in database_apply_wait injection + * Run insert while bootstrapping another node, suspend this insert in database_apply injection * Bootstrap succeeds, capture the final topology version * Start decommission -> triggers global barrier, which we fail on another injection * This failure is not fatal, the cleanup procedure continues and blocks on waiting for the stale write - * We release the database_apply_wait injection, cleanup succeeds, write fails with 'stale topology exception' + * We release the database_apply injection, cleanup succeeds, write fails with 'stale topology exception' """ config = {'tablets_mode_for_new_keyspaces': 'disabled'} @@ -119,15 +119,15 @@ async def test_cleanup_waits_for_stale_writes(manager: ManagerClient): # Have a write request with write_both_read_new version stuck on both nodes: # - On the first node, this exercises the coordinator fencing code path. # - On the second node, this exercises the replica code path. - logger.info("Enable 'database_apply_wait' injection") + logger.info("Enable 'database_apply' injection") for s in servers[:-1]: - await manager.api.enable_injection(s.ip_addr, 'database_apply_wait', - False, parameters={'cf_name': 'my_test_table'}) + await manager.api.enable_injection(s.ip_addr, 'database_apply', + False, parameters={'ks_name': ks, 'cf_name': 'my_test_table', 'what': 'wait'}) logger.info("Start write") write_task = cql.run_async(f"INSERT INTO {ks}.my_test_table (pk, c) VALUES (1, 1)", host=hosts[0]) - logger.info("Waiting for database_apply_wait") - await log0.wait_for("database_apply_wait: wait") - await log1.wait_for("database_apply_wait: wait") + logger.info("Waiting for database_apply") + await log0.wait_for("database_apply: wait") + await log1.wait_for("database_apply: wait") # Finish bootstrapping the node logger.info("Trigger topology_coordinator/write_both_read_new/after_barrier") @@ -159,9 +159,9 @@ async def test_cleanup_waits_for_stale_writes(manager: ManagerClient): assert len(flush_matches) == 0 # Release the write -- the cleanup process should resume and the decommission succeed - await manager.api.message_injection(servers[0].ip_addr, "database_apply_wait") + await manager.api.message_injection(servers[0].ip_addr, "database_apply") await log0.wait_for("vnodes_cleanup: flush_all_tables", timeout=15) - await manager.api.message_injection(servers[1].ip_addr, "database_apply_wait") + await manager.api.message_injection(servers[1].ip_addr, "database_apply") await log1.wait_for("vnodes_cleanup: flush_all_tables", timeout=15) await decommission_task diff --git a/test/cluster/test_data_resurrection_in_memtable.py b/test/cluster/test_data_resurrection_in_memtable.py index 690fcc63f7..e97f44e20c 100644 --- a/test/cluster/test_data_resurrection_in_memtable.py +++ b/test/cluster/test_data_resurrection_in_memtable.py @@ -55,7 +55,7 @@ async def run_test_cache_tombstone_gc(manager: ManagerClient, statement_pairs: l for write_statement, delete_statement in statement_pairs: execute_with_tracing(cql, SimpleStatement(write_statement.format(ks=ks), consistency_level=ConsistencyLevel.ALL), log = True) - await manager.api.enable_injection(node3.ip_addr, "database_apply", one_shot=False) + await manager.api.enable_injection(node3.ip_addr, "database_apply", one_shot=False, parameters={"ks_name": ks, "cf_name": "tbl", "what": "throw"}) execute_with_tracing(cql, SimpleStatement(delete_statement.format(ks=ks), consistency_level=ConsistencyLevel.LOCAL_QUORUM), log = True) await manager.api.disable_injection(node3.ip_addr, "database_apply") diff --git a/test/cluster/test_read_repair.py b/test/cluster/test_read_repair.py index 0eb4c114fa..5a4a8755df 100644 --- a/test/cluster/test_read_repair.py +++ b/test/cluster/test_read_repair.py @@ -330,7 +330,7 @@ async def test_read_repair_with_trace_logging(request, manager): insert_stmt = cql.prepare(f"INSERT INTO {ks}.t (pk, ck, c) VALUES (?, ?, ?)") insert_stmt.consistency_level = ConsistencyLevel.ONE - await manager.api.enable_injection(node1.ip_addr, "database_apply", one_shot=False) + await manager.api.enable_injection(node1.ip_addr, "database_apply", one_shot=False, parameters={"ks_name": ks, "cf_name": "t", "what": "throw"}) for ck in range(0, 100): await cql.run_async(insert_stmt, (0, ck, ck)) await manager.api.disable_injection(node1.ip_addr, "database_apply") diff --git a/test/cluster/test_repair.py b/test/cluster/test_repair.py index c46dae4f1c..48422bfc27 100644 --- a/test/cluster/test_repair.py +++ b/test/cluster/test_repair.py @@ -313,7 +313,7 @@ async def test_repair_timtestamp_difference(manager): other_nodes = [n for n in nodes if n != node] for other_node in other_nodes: - await manager.api.enable_injection(other_node.ip_addr, "database_apply", False, {}) + await manager.api.enable_injection(other_node.ip_addr, "database_apply", False, parameters={"ks_name": "ks", "cf_name": "tbl", "what": "throw"}) await manager.driver_connect(node) diff --git a/test/cluster/test_tablets2.py b/test/cluster/test_tablets2.py index 69d902368a..dc482993c8 100644 --- a/test/cluster/test_tablets2.py +++ b/test/cluster/test_tablets2.py @@ -2156,7 +2156,7 @@ async def test_split_and_incremental_repair_synchronization(manager: ManagerClie insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, c) VALUES (?, ?)") insert_stmt.consistency_level = ConsistencyLevel.ONE - await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False) + await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False, parameters={"ks_name": ks, "cf_name": "test", "what": "throw"}) pks = range(256, 512) await asyncio.gather(*[cql.run_async(insert_stmt, (k, k)) for k in pks]) await manager.api.disable_injection(servers[0].ip_addr, "database_apply") diff --git a/test/pylib/util.py b/test/pylib/util.py index f60f00699c..5d09adcc7e 100644 --- a/test/pylib/util.py +++ b/test/pylib/util.py @@ -393,7 +393,12 @@ def execute_with_tracing(cql : Session, statement : str | Statement, log : bool cql_execute_extra_kwargs['trace'] = True query_result = cql.execute(statement, *cql_execute_extra_args, **cql_execute_extra_kwargs) - tracing = query_result.get_all_query_traces(max_wait_sec_per=900) + # Tracing events are written to system_traces.events with CL=ANY, so they are + # only guaranteed to be present on the local node of the query coordinator. + # Reading them back with the driver default (CL=LOCAL_ONE) may hit a replica + # that has not yet received all events, causing intermittent failures. + # Using CL=ALL ensures events from all replicas are merged. + tracing = query_result.response_future.get_all_query_traces(max_wait_per=900, query_cl=ConsistencyLevel.ALL) ret = [] page_traces = [] diff --git a/test/storage/test_out_of_space_prevention.py b/test/storage/test_out_of_space_prevention.py index 9027a57ced..206dd65cb8 100644 --- a/test/storage/test_out_of_space_prevention.py +++ b/test/storage/test_out_of_space_prevention.py @@ -523,7 +523,7 @@ async def test_repair_failure_on_split_rejection(manager: ManagerClient, volumes insert_stmt = cql.prepare(f"INSERT INTO {cf} (pk, t) VALUES (?, ?)") insert_stmt.consistency_level = ConsistencyLevel.ONE - await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False) + await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False, parameters={"ks_name": ks, "cf_name": table, "what": "throw"}) pks = range(256, 512) await asyncio.gather(*[cql.run_async(insert_stmt, (k, f'{k}')) for k in pks]) await manager.api.disable_injection(servers[0].ip_addr, "database_apply")