From 8925f315966a2bc7affb46fc6360fd4b2bf42e2c Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Sat, 4 Oct 2025 13:20:25 +0200 Subject: [PATCH] test_tablets_lwt: add test_lwt_shutdown --- service/paxos/paxos_state.cc | 2 + service/storage_proxy.cc | 2 +- test/cluster/test_tablets_lwt.py | 81 ++++++++++++++++++++++++++++++++ 3 files changed, 84 insertions(+), 1 deletion(-) diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index 221ab66c90..e68ece11b2 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -283,6 +283,8 @@ future<> paxos_state::learn(storage_proxy& sp, paxos_store& paxos_store, schema_ tracing::trace(tr_state, "Not committing decision {} as ballot timestamp predates last truncation time", decision); } + co_await utils::get_local_injector().inject("paxos_state_learn_after_mutate", utils::wait_for_message(5min)); + // We don't need to lock the partition key if there is no gap between loading paxos // state and saving it, and here we're just blindly updating. co_await utils::get_local_injector().inject("paxos_timeout_after_save_decision", timeout); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index b96ad061a8..f6f98ca960 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -7246,7 +7246,7 @@ future<> storage_proxy::abort_batch_writes() { future<> storage_proxy::stop() { - return make_ready_future<>(); + co_await utils::get_local_injector().inject("storage_proxy::stop", utils::wait_for_message(5min)); } locator::token_metadata_ptr storage_proxy::get_token_metadata_ptr() const noexcept { diff --git a/test/cluster/test_tablets_lwt.py b/test/cluster/test_tablets_lwt.py index 8d041c27ac..06d21d5876 100644 --- a/test/cluster/test_tablets_lwt.py +++ b/test/cluster/test_tablets_lwt.py @@ -830,3 +830,84 @@ async def test_lwts_for_special_tables(manager: ManagerClient): with pytest.raises(InvalidRequest, match=re.escape(f"LWT is not supported on CDC log tables: {ks}.test_scylla_cdc_log")): await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test_scylla_cdc_log WHERE \"cdc$stream_id\"=0xAB", consistency_level=ConsistencyLevel.SERIAL)) + + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_lwt_shutdown(manager: ManagerClient): + """ + This is a regression test for #26355: + * Start a cluster with two nodes (s0, s1) and a tablet-based table. + * Inject `paxos_state_learn_after_mutate` on s0 for this table. + * Inject `storage_proxy::stop` on s0 to ensure that s0 shutdown does not + complete before the LWT write on s0 finishes. + * Execute an LWT with `cl_learn=1` on s0 and wait for it to succeed. + This LWT leaves a background write to s0 storage. + * Begin s0 shutdown and wait until it reaches “storage proxy RPC verbs”, + but not “paxos store”. In other words, storage_proxy::remote should start + draining, but the paxos store should not yet be destroyed since it is still + in use. + * Release the `database_apply_wait` injection. + * Verify that shutdown completes successfully. + * After s0 restarts, a node-local read on s0 must return the updated value. + """ + cmdline = [ + '--logger-log-level', 'paxos=trace' + ] + [s0, s1] = await manager.servers_add(2, cmdline=cmdline, property_file=[ + {'dc': 'my_dc', 'rack': 'r1'}, + {'dc': 'my_dc', 'rack': 'r2'} + ]) + (cql, [h0, _]) = await manager.get_ready_cql([s0, s1]) + async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, v int)") + + logger.info(f"Enable 'paxos_state_learn_after_mutate' injection on {s0.ip_addr}") + await manager.api.enable_injection(s0.ip_addr, 'paxos_state_learn_after_mutate', + False, parameters={'cf_name': 'test'}) + + logger.info("Run an LWT") + await cql.run_async(SimpleStatement(f"INSERT INTO {ks}.test (pk, v) VALUES (1, 2) IF NOT EXISTS", + consistency_level=ConsistencyLevel.ONE), + host=h0) + + logger.info("Open log") + log = await manager.server_open_log(s0.server_id) + logger.info("Wait for 'paxos_state_learn_after_mutate' injection") + await log.wait_for('paxos_state_learn_after_mutate: waiting for message') + + logger.info(f"Enable 'storage_proxy::stop' injection on {s0.ip_addr}") + await manager.api.enable_injection(s0.ip_addr, 'storage_proxy::stop', True) + + logger.info("Start node shutdown") + stop_task = asyncio.create_task(manager.server_stop_gracefully(s0.server_id)) + await log.wait_for('Shutting down storage proxy RPC verbs') + # assert len(await log.grep('Shutting down paxos store')) == 0 + + logger.info("Trigger storage_proxy::stop and database_apply_wait") + await manager.api.message_injection(s0.ip_addr, "paxos_state_learn_after_mutate") + await manager.api.message_injection(s0.ip_addr, 'storage_proxy::stop') + + logger.info("Waiting for paxos store shutdown") + await log.wait_for('Shutting down paxos store') + + logger.info("Waiting for the node shutdown") + await stop_task + + logger.info("Restarting server") + await manager.server_start(s0.server_id) + + logger.info("Reconnecting the driver") + cql = await reconnect_driver(manager) + + logger.info("Wait for cql and get hosts") + [h0, _] = await wait_for_cql_and_get_hosts(cql, [s0, s1], time.time() + 60) + + logger.info("Run simple non-paxos read") + rows = await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;", + consistency_level=ConsistencyLevel.ONE), + host=h0) + assert len(rows) == 1 + row = rows[0] + assert row.pk == 1 + assert row.v == 2