replica/database: consolidate the two database_apply error injections

Into a single database_apply one. Add three parameters:
* ks_name and cf_name to filter the tables to be affected
* what - what to do: throw or wait

This leads to smaller footprint in the code and improved filtering for
table names at the cost of some extra error injection params in the
tests.

(cherry picked from commit f375aae257)
This commit is contained in:
Botond Dénes
2026-03-05 10:51:48 +02:00
parent cdbf53e9d7
commit 4ce17d20df
7 changed files with 23 additions and 23 deletions

View File

@@ -2246,16 +2246,16 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra
// assume failure until proven otherwise
auto update_writes_failed = defer([&] { ++_stats->total_writes_failed; });
utils::get_local_injector().inject("database_apply", [&s] () {
if (!is_system_keyspace(s->ks_name())) {
throw std::runtime_error("injected error");
co_await utils::get_local_injector().inject("database_apply", [&s] (auto& handler) -> future<> {
if (s->ks_name() != handler.get("ks_name") || s->cf_name() != handler.get("cf_name")) {
co_return;
}
});
co_await utils::get_local_injector().inject("database_apply_wait", [&] (auto& handler) -> future<> {
if (s->cf_name() == handler.get("cf_name")) {
dblog.info("database_apply_wait: wait");
if (handler.get("what") == "throw") {
throw std::runtime_error(format("injected error for {}.{}", s->ks_name(), s->cf_name()));
} else if (handler.get("what") == "wait") {
dblog.info("database_apply: wait");
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
dblog.info("database_apply_wait: done");
dblog.info("database_apply: done");
}
});

View File

@@ -79,11 +79,11 @@ async def test_no_cleanup_when_unnecessary(manager: ManagerClient):
async def test_cleanup_waits_for_stale_writes(manager: ManagerClient):
"""Scenario:
* Start two nodes, a vnodes-based table with an rf=2
* Run insert while bootstrapping another node, suspend this insert in database_apply_wait injection
* Run insert while bootstrapping another node, suspend this insert in database_apply injection
* Bootstrap succeeds, capture the final topology version
* Start decommission -> triggers global barrier, which we fail on another injection
* This failure is not fatal, the cleanup procedure continues and blocks on waiting for the stale write
* We release the database_apply_wait injection, cleanup succeeds, write fails with 'stale topology exception'
* We release the database_apply injection, cleanup succeeds, write fails with 'stale topology exception'
"""
config = {'tablets_mode_for_new_keyspaces': 'disabled'}
@@ -119,15 +119,15 @@ async def test_cleanup_waits_for_stale_writes(manager: ManagerClient):
# Have a write request with write_both_read_new version stuck on both nodes:
# - On the first node, this exercises the coordinator fencing code path.
# - On the second node, this exercises the replica code path.
logger.info("Enable 'database_apply_wait' injection")
logger.info("Enable 'database_apply' injection")
for s in servers[:-1]:
await manager.api.enable_injection(s.ip_addr, 'database_apply_wait',
False, parameters={'cf_name': 'my_test_table'})
await manager.api.enable_injection(s.ip_addr, 'database_apply',
False, parameters={'ks_name': ks, 'cf_name': 'my_test_table', 'what': 'wait'})
logger.info("Start write")
write_task = cql.run_async(f"INSERT INTO {ks}.my_test_table (pk, c) VALUES (1, 1)", host=hosts[0])
logger.info("Waiting for database_apply_wait")
await log0.wait_for("database_apply_wait: wait")
await log1.wait_for("database_apply_wait: wait")
logger.info("Waiting for database_apply")
await log0.wait_for("database_apply: wait")
await log1.wait_for("database_apply: wait")
# Finish bootstrapping the node
logger.info("Trigger topology_coordinator/write_both_read_new/after_barrier")
@@ -159,9 +159,9 @@ async def test_cleanup_waits_for_stale_writes(manager: ManagerClient):
assert len(flush_matches) == 0
# Release the write -- the cleanup process should resume and the decommission succeed
await manager.api.message_injection(servers[0].ip_addr, "database_apply_wait")
await manager.api.message_injection(servers[0].ip_addr, "database_apply")
await log0.wait_for("vnodes_cleanup: flush_all_tables", timeout=15)
await manager.api.message_injection(servers[1].ip_addr, "database_apply_wait")
await manager.api.message_injection(servers[1].ip_addr, "database_apply")
await log1.wait_for("vnodes_cleanup: flush_all_tables", timeout=15)
await decommission_task

View File

@@ -55,7 +55,7 @@ async def run_test_cache_tombstone_gc(manager: ManagerClient, statement_pairs: l
for write_statement, delete_statement in statement_pairs:
execute_with_tracing(cql, SimpleStatement(write_statement.format(ks=ks), consistency_level=ConsistencyLevel.ALL), log = True)
await manager.api.enable_injection(node3.ip_addr, "database_apply", one_shot=False)
await manager.api.enable_injection(node3.ip_addr, "database_apply", one_shot=False, parameters={"ks_name": ks, "cf_name": "tbl", "what": "throw"})
execute_with_tracing(cql, SimpleStatement(delete_statement.format(ks=ks), consistency_level=ConsistencyLevel.LOCAL_QUORUM), log = True)
await manager.api.disable_injection(node3.ip_addr, "database_apply")

View File

@@ -330,7 +330,7 @@ async def test_read_repair_with_trace_logging(request, manager):
insert_stmt = cql.prepare(f"INSERT INTO {ks}.t (pk, ck, c) VALUES (?, ?, ?)")
insert_stmt.consistency_level = ConsistencyLevel.ONE
await manager.api.enable_injection(node1.ip_addr, "database_apply", one_shot=False)
await manager.api.enable_injection(node1.ip_addr, "database_apply", one_shot=False, parameters={"ks_name": ks, "cf_name": "t", "what": "throw"})
for ck in range(0, 100):
await cql.run_async(insert_stmt, (0, ck, ck))
await manager.api.disable_injection(node1.ip_addr, "database_apply")

View File

@@ -313,7 +313,7 @@ async def test_repair_timtestamp_difference(manager):
other_nodes = [n for n in nodes if n != node]
for other_node in other_nodes:
await manager.api.enable_injection(other_node.ip_addr, "database_apply", False, {})
await manager.api.enable_injection(other_node.ip_addr, "database_apply", False, parameters={"ks_name": "ks", "cf_name": "tbl", "what": "throw"})
await manager.driver_connect(node)

View File

@@ -2156,7 +2156,7 @@ async def test_split_and_incremental_repair_synchronization(manager: ManagerClie
insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, c) VALUES (?, ?)")
insert_stmt.consistency_level = ConsistencyLevel.ONE
await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False)
await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False, parameters={"ks_name": ks, "cf_name": "test", "what": "throw"})
pks = range(256, 512)
await asyncio.gather(*[cql.run_async(insert_stmt, (k, k)) for k in pks])
await manager.api.disable_injection(servers[0].ip_addr, "database_apply")

View File

@@ -523,7 +523,7 @@ async def test_repair_failure_on_split_rejection(manager: ManagerClient, volumes
insert_stmt = cql.prepare(f"INSERT INTO {cf} (pk, t) VALUES (?, ?)")
insert_stmt.consistency_level = ConsistencyLevel.ONE
await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False)
await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False, parameters={"ks_name": ks, "cf_name": table, "what": "throw"})
pks = range(256, 512)
await asyncio.gather(*[cql.run_async(insert_stmt, (k, f'{k}')) for k in pks])
await manager.api.disable_injection(servers[0].ip_addr, "database_apply")