diff --git a/test/cluster/test_unfinished_writes_during_shutdown.py b/test/cluster/test_unfinished_writes_during_shutdown.py index 026349e5e5..00f21b8956 100644 --- a/test/cluster/test_unfinished_writes_during_shutdown.py +++ b/test/cluster/test_unfinished_writes_during_shutdown.py @@ -12,6 +12,7 @@ import time from cassandra import ConsistencyLevel # type: ignore from cassandra.query import SimpleStatement # type: ignore from test.pylib.manager_client import ManagerClient +from test.pylib.internal_types import ServerInfo from test.pylib.util import wait_for_cql_and_get_hosts from test.cluster.util import check_token_ring_and_group0_consistency, new_test_keyspace from test.pylib.util import wait_for @@ -23,19 +24,37 @@ from cassandra.cluster import ConnectionException, NoHostAvailable # type: igno logger = logging.getLogger(__name__) @pytest.mark.asyncio +@pytest.mark.xfail(reason="SCYLLADB-1842") @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') async def test_unfinished_writes_during_shutdown(request: pytest.FixtureRequest, manager: ManagerClient) -> None: - """ Test a simultaneous topology change and write query during shutdown, which may cause the node to get stuck (see https://github.com/scylladb/scylladb/issues/23665). + """ Test a simultaneous topology change and write query during shutdown, + which may cause the node to get stuck. - 1. Create a keyspace with replication factor 3 - 2. Start 3 servers - 3. Use error injection to pause the 3rd node on a topology change (`barrier_and_drain`) - 4 Trigger a topology change by adding a new node to the cluster. - 5. Make sure the topology change was paused on the node 3 (`barrier_and_drain`) - 6. Now with error injection, make sure node 2 will pause before sending a write acknowledgment. - 7. Send a write query to the node 3. (which already should be paused on the topology change operation) - 8. The query should have completed, but one write to node 2 should be remaining, making write_response_handler block the topology change in node 3 - 9. Start node 3 shutdown. The shutdown should hang since the one of the replicas did not send the response and therefore the response write handler still holds the ERM. + The test runs twice on the same cluster: + - First with the target node being a non-coordinator (hangs in + uninit_messaging_service, see scylladb/scylladb#23665). + - Then with the target node being the topology coordinator (hangs in + do_drain -> wait_for_group0_stop, see SCYLLADB-1842). + + Each run: + 1. Enable pause_before_barrier_and_drain injection on the target. + 2. Trigger a topology change by adding a new node to the cluster. + 3. Wait for the first barrier_and_drain to hit the injection on + the target. + 4. Pause write responses on all other running servers via injection. + 5. Send a write with CL=ONE to the target node. It completes + immediately from the coordinator's local replica. The mutation + is still sent to other replicas whose responses are paused, + keeping the write_response_handler alive. + 6. Release the first barrier_and_drain (the write handler's ERM + version is still current). Wait for the second + barrier_and_drain — by now topology_state_load has installed + a new token_metadata version, making the write handler's ERM + stale. + 7. Start graceful shutdown of the target node. + 8. Disable the injection to release all paused barrier_and_drain + handlers, then unblock the delayed write response. + 9. Verify shutdown completes within 15s (deadlock = test failure). """ logger.info("Creating a new cluster") @@ -43,89 +62,130 @@ async def test_unfinished_writes_during_shutdown(request: pytest.FixtureRequest, '--logger-log-level', 'debug_error_injection=debug', ] - servers = await manager.servers_add(3, auto_rack_dc="dc1", cmdline=cmdline) + await manager.servers_add(2, auto_rack_dc="dc1", cmdline=cmdline) - cql, hosts = await manager.get_ready_cql(servers) + async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2};") as ks: - async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3};") as ks: - await cql.run_async(f"CREATE TABLE {ks}.t (pk int primary key, v int)") - target_host = hosts[2] - target_server = servers[2] + # Preconditions: exactly 2 running nodes, RF=2, so both nodes + # are replicas for every partition. The target node is always a + # replica (local write satisfies CL=ONE) and the other node is + # always a replica (its paused response keeps the handler alive). + # The caller restores these preconditions between runs. + async def do_test(target_server: ServerInfo) -> ServerInfo: + """Run the test scenario. Returns the newly added server.""" + other_server = next(s for s in running if s.server_id != target_server.server_id) + target_host = next(h for h in hosts + if h.address == str(target_server.rpc_address)) - # Make the target node stop before locking the ERM - logger.info( - f"Enabling injection 'pause_before_barrier_and_drain' on the target server {target_server}") - target_server_log = await manager.server_open_log(target_server.server_id) - await manager.api.enable_injection(target_server.ip_addr, "pause_before_barrier_and_drain", one_shot=False) + logger.info( + f"Enabling injection 'pause_before_barrier_and_drain' on the target server {target_server}") + target_server_log = await manager.server_open_log(target_server.server_id) + await manager.api.enable_injection(target_server.ip_addr, "pause_before_barrier_and_drain", one_shot=False) - async def do_add_node(): + # Start adding a new node, causing a topology change that issues barrier_and_drain logger.info("Adding a node to the cluster") - try: - await manager.server_add(property_file={"dc": "dc1", "rack": servers[0].rack}) - except Exception as exc: - logger.error(f"Failed to add a new node: {exc}") + add_last_node_task = asyncio.create_task( + manager.server_add(property_file={"dc": "dc1", "rack": running[0].rack})) - # Start adding a new node to the cluster, causing a topology change that will issue a barrier and drain - add_last_node_task = asyncio.create_task(do_add_node()) + # Wait for the topology change to start + logger.info("Waiting for a topology change to start") + mark, _ = await target_server_log.wait_for("pause_before_barrier_and_drain: waiting for message") - # Wait for the topology change to start - logger.info("Waiting for a topology change to start") - mark, _ = await target_server_log.wait_for("pause_before_barrier_and_drain: waiting for message") + # Pause write responses on the other node so it keeps the + # write_response_handler alive on the target. + await inject_error_on(manager, "storage_proxy_write_response_pause", [other_server]) + other_server_log = await manager.server_open_log(other_server.server_id) - # Now make sure responses on one of the replicas will be delayed - server_to_pause = servers[1] - await inject_error_on(manager, "storage_proxy_write_response_pause", [server_to_pause]) - logger.info( - f"Pausing responses on one of the replicas {server_to_pause}") - paused_server_logs = await manager.server_open_log(server_to_pause.server_id) + # Send a write with CL=ONE so it completes from the coordinator's + # local replica without waiting for the paused server. + await cql.run_async( + SimpleStatement(f"insert into {ks}.t (pk, v) values ({32765}, {17777})", + consistency_level=ConsistencyLevel.ONE), + host=target_host) - # Now send a write query to the target node that will be shut down. - await cql.run_async(f"insert into {ks}.t (pk, v) values ({32765}, {17777})", host=target_host) + # Make sure the other replica got the write request and is paused. + await other_server_log.wait_for("storage_proxy_write_response_pause: waiting for message") - # Make sure the node that's response is paused, got the write request. - await paused_server_logs.wait_for("storage_proxy_write_response_pause: waiting for message") + # Release the first barrier_and_drain — it completes because the write + # handler holds the current version (not stale yet). + await manager.api.message_injection(target_server.ip_addr, 'pause_before_barrier_and_drain') - # Release the first barrier_and_drain — it completes because the write - # handler holds the current version (not stale yet). - await manager.api.message_injection(target_server.ip_addr, 'pause_before_barrier_and_drain') + # Wait for the second barrier_and_drain. Between the first and second, + # topology_state_load installs a new token_metadata version. The write + # handler still holds the old version's ERM, which is now stale. + await target_server_log.wait_for("pause_before_barrier_and_drain: waiting for message", from_mark=mark) - # Wait for the second barrier_and_drain. Between the first and second, - # topology_state_load installs a new token_metadata version. The write - # handler still holds the old version's ERM, which is now stale. - await target_server_log.wait_for("pause_before_barrier_and_drain: waiting for message", from_mark=mark) - - # Start shutdown of the query coordinator - async def do_shutdown(): + # Start shutdown of the target node logger.info(f"Starting shutdown of node {target_server.server_id}") - await manager.server_stop_gracefully(target_server.server_id) + shutdown_task = asyncio.create_task( + manager.server_stop_gracefully(target_server.server_id)) - shutdown_task = asyncio.create_task(do_shutdown()) + # Wait for stop_transport to complete. At this point the local + # messaging service is shut down, so MUTATION_DONE from the other + # replica can no longer reach the coordinator — the write handler + # has no chance to complete naturally. Next we release the paused + # barrier_and_drain and unblock the write response. The + # barrier_and_drain handler calls stale_versions_in_use() which + # blocks until the stale ERM held by the write handler is released. + # The test verifies that shutdown succeeds because the fix + # forcefully cancels write handlers during storage_proxy shutdown, + # releasing the stale ERM and unblocking stale_versions_in_use(). + await target_server_log.wait_for("Stop transport: done") - # Wait for the shutdown to start - await target_server_log.wait_for("Stop transport: done") + # Disable the injection. This releases the paused barrier_and_drain + # handler (and any subsequent ones that arrived during stop_transport) + # so they don't block uninit_messaging_service. + await manager.api.disable_injection(target_server.ip_addr, "pause_before_barrier_and_drain") - # Disable the injection. This releases the paused barrier_and_drain - # handler (and any subsequent ones that arrived during stop_transport) - # so they don't block uninit_messaging_service. - await manager.api.disable_injection(target_server.ip_addr, "pause_before_barrier_and_drain") + logger.info(f"Unblocking writes on the node {other_server}") + await manager.api.message_injection(other_server.ip_addr, 'storage_proxy_write_response_pause') - logger.info(f"Unblocking writes on the node {server_to_pause}") - await manager.api.message_injection(server_to_pause.ip_addr, 'storage_proxy_write_response_pause') + logger.info("Waiting for the shutdown to complete") + try: + await asyncio.wait_for(shutdown_task, timeout=15) + except asyncio.TimeoutError: + # Deadlock reproduced — shutdown hung because stale_versions_in_use + # blocks on the write handler holding a stale token_metadata version. + # Kill all servers including any being bootstrapped (stuck because + # the coordinator is dead). This unblocks the server-side addserver + # handler so _after_test doesn't wait 120s for it. + logger.info("Shutdown did not complete within the timeout, killing all servers") + for s in await manager.all_servers() + await manager.starting_servers(): + await manager.server_stop(s.server_id) + pytest.fail(f"Shutdown did not complete within 15s — deadlock reproduced" + f" (target={target_server})") - logger.info("Waiting for the shutdown to complete") - try: - await asyncio.wait_for(shutdown_task, timeout=15) - except asyncio.TimeoutError: - # Deadlock reproduced — shutdown hung because stale_versions_in_use - # blocks on the write handler holding a stale token_metadata version. - # We must explicitly kill the node here: the manager's _after_test - # handler waits up to 120s for all outstanding tasks (including - # the stuck stop_gracefully request) before teardown proceeds. - # Killing the process lets stop_gracefully's cmd.wait() return, - # which unblocks _after_test. - logger.info("Shutdown did not complete within the timeout, killing the node") - await manager.server_stop(target_server.server_id) - pytest.fail("Shutdown did not complete within 15s — deadlock reproduced") + # Restart the target node before waiting for addnode — the addnode + # needs raft quorum which requires the target to be alive. + await manager.server_start(target_server.server_id) - logger.info("Waiting for addnode to complete") - await add_last_node_task + logger.info("Waiting for addnode to complete") + return await add_last_node_task + + async def pick_target(is_coordinator: bool) -> ServerInfo: + coordinator_host_id = await get_topology_coordinator(manager) + for s in running: + hid = await manager.get_host_id(s.server_id) + if (hid == coordinator_host_id) == is_coordinator: + return s + assert False, f"Could not find {'coordinator' if is_coordinator else 'non-coordinator'} among {running}" + + # Run 1: target is a non-coordinator node + running = await manager.running_servers() + cql, hosts = await manager.get_ready_cql(running) + await cql.run_async(f"CREATE TABLE {ks}.t (pk int primary key, v int)") + target_server = await pick_target(is_coordinator=False) + logger.info(f"=== Run 1: target={target_server} (non-coordinator) ===") + new_server = await do_test(target_server) + + # Restore the cluster to its original state: decommission the added + # node. do_test already restarted the target. This keeps 2 nodes with + # RF=2, so both are replicas for every partition — same as run 1. + await manager.decommission_node(new_server.server_id) + + # Run 2: target is the topology coordinator + running = await manager.running_servers() + cql, hosts = await manager.get_ready_cql(running) + target_server = await pick_target(is_coordinator=True) + logger.info(f"=== Run 2: target={target_server} (coordinator) ===") + await do_test(target_server)