From a2b2a42936cefb61a1ccad4dc5e7eecd46b2c822 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Wed, 13 May 2026 11:59:51 +0200 Subject: [PATCH] storage_service: cancel write handlers during drain to prevent shutdown deadlock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a node shuts down, do_drain() calls stop_transport() which tears down the messaging service. After this point, MUTATION_DONE responses from replicas can no longer reach the coordinator, so any in-flight write_response_handlers will never complete naturally. These handlers hold ERMs referencing stale token_metadata versions. If the topology coordinator calls barrier_and_drain (either on itself or via RPC), it blocks in stale_versions_in_use() waiting for these stale versions to be released. This causes: - On the coordinator node: do_drain -> wait_for_group0_stop deadlock (the topology coordinator fiber is stuck in barrier_and_drain). - On non-coordinator nodes: ss::stop -> uninit_messaging_service deadlock (the barrier_and_drain RPC handler holds the gate open). Fix: cancel all write response handlers on all shards right after stop_transport() in do_drain(). This releases their ERMs and the associated stale token_metadata versions, unblocking stale_versions_in_use(). Heap-allocate _write_handlers_gate and add an allow_new parameter to cancel_all_write_response_handlers(). When allow_new=true (used by do_drain), the gate is closed and swapped with a fresh one — existing handlers are waited on while new handlers can still be created. This avoids blocking internal writes (paxos learn, compaction history updates) that still need to create handlers during the remainder of the drain sequence. When allow_new=false (used by drain_on_shutdown), the gate is closed permanently — no new handlers can be created after final shutdown. Update test_lwt_shutdown to wait for 'Stop transport: done' instead of 'Shutting down storage proxy RPC verbs'. The latter message is now only logged after do_drain() completes, but do_drain() blocks in cancel_all_write_response_handlers() waiting for the background paxos learn handler — which is exactly what the test needs to release before shutdown can proceed. Fixes: SCYLLADB-2163 Refs: scylladb/scylladb#23665 (cherry picked from commit 2927f0dd2131ba6ee6a887d119e0a74d804177ea) --- service/storage_proxy.cc | 19 +++++++++++++++---- service/storage_proxy.hh | 4 ++-- service/storage_service.cc | 10 ++++++++++ test/cluster/test_tablets_lwt.py | 6 +++++- .../test_unfinished_writes_during_shutdown.py | 1 - 5 files changed, 32 insertions(+), 8 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 774ea1dec2..e43d61de8e 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1753,7 +1753,7 @@ public: register_cancellable(); } - attach_to(_proxy->_write_handlers_gate); + attach_to(*_proxy->_write_handlers_gate); } void attach_to(gate& g) { _holders.push_back(g.hold()); @@ -7474,7 +7474,7 @@ void storage_proxy::on_down(const gms::inet_address& endpoint, locator::host_id }; future<> storage_proxy::drain_on_shutdown() { - co_await cancel_all_write_response_handlers(); + co_await cancel_all_write_response_handlers(false); co_await _hints_resource_manager.stop(); } @@ -7498,12 +7498,23 @@ future> storage_proxy::describ return locator::describe_ring(_db.local(), _remote->gossiper(), keyspace, include_only_local_dc); } -future<> storage_proxy::cancel_all_write_response_handlers() { - auto f = _write_handlers_gate.close(); +future<> storage_proxy::cancel_all_write_response_handlers(bool allow_new) { + // Cancel all existing handlers. They are attached to the current gate. while (!_response_handlers.empty()) { _response_handlers.begin()->second->timeout_cb(); co_await coroutine::maybe_yield(); } + // Close the gate and wait for destruction of handlers still alive via + // external shared_ptr refs (e.g. background futures in send_to_live_endpoints). + auto f = _write_handlers_gate->close(); + std::unique_ptr old_gate; + if (allow_new) { + // Swap in a fresh open gate so new handlers can be created. + // Keep the old gate alive (via old_gate) until the close future resolves. + old_gate = std::exchange(_write_handlers_gate, std::make_unique()); + } + // If !allow_new, the closed gate stays in _write_handlers_gate — any + // subsequent handler creation will get gate_closed_exception. co_await std::move(f); } } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index f762f5dc0d..731354e974 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -322,7 +322,7 @@ private: class cancellable_write_handlers_list; std::unique_ptr _cancellable_write_handlers_list; - gate _write_handlers_gate; + std::unique_ptr _write_handlers_gate = std::make_unique(); /* This is a pointer to the shard-local part of the sharded cdc_service: * storage_proxy needs access to cdc_service to augment mutations. @@ -572,7 +572,7 @@ public: bool is_me(gms::inet_address addr) const noexcept; bool is_me(const locator::effective_replication_map& erm, locator::host_id id) const noexcept; - future<> cancel_all_write_response_handlers(); + future<> cancel_all_write_response_handlers(bool allow_new); private: bool only_me(const locator::effective_replication_map& erm, const host_id_vector_replica_set& replicas) const noexcept; diff --git a/service/storage_service.cc b/service/storage_service.cc index 14815ff3fa..3ba31499f6 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2971,6 +2971,16 @@ future<> storage_service::do_drain() { // Need to stop transport before group0, otherwise RPCs may fail with raft_group_not_found. co_await stop_transport(); + // Cancel write handlers on all shards so they release their ERMs + // (and the associated token_metadata versions). Without this, + // wait_for_group0_stop below may deadlock: the topology coordinator + // fiber calls barrier_and_drain which blocks in stale_versions_in_use() + // waiting for stale token_metadata versions held by write handlers + // whose MUTATION_DONE responses can no longer arrive (transport is stopped). + co_await _qp.proxy().container().invoke_on_all([] (storage_proxy& sp) { + return sp.cancel_all_write_response_handlers(true); + }); + // Drain view builder before group0, because the view builder uses group0 to coordinate view building. // Drain after transport is stopped, because view_builder::drain aborts view writes for user writes as well. co_await _view_builder.invoke_on_all(&db::view::view_builder::drain); diff --git a/test/cluster/test_tablets_lwt.py b/test/cluster/test_tablets_lwt.py index 95418a6104..cf2c165fb4 100644 --- a/test/cluster/test_tablets_lwt.py +++ b/test/cluster/test_tablets_lwt.py @@ -840,7 +840,11 @@ async def test_lwt_shutdown(manager: ManagerClient): logger.info("Start node shutdown") stop_task = asyncio.create_task(manager.server_stop_gracefully(s0.server_id)) - await log.wait_for('Shutting down storage proxy RPC verbs') + # Wait for stop_transport to complete. After this, do_drain() will + # block in cancel_all_write_response_handlers() waiting for the + # paxos learn's background write handler to be released. The paxos + # store is still alive at this point. + await log.wait_for('Stop transport: done') assert len(await log.grep('Shutting down paxos store')) == 0 logger.info("Trigger paxos_state_learn_after_mutate") diff --git a/test/cluster/test_unfinished_writes_during_shutdown.py b/test/cluster/test_unfinished_writes_during_shutdown.py index 00f21b8956..36819bfd3c 100644 --- a/test/cluster/test_unfinished_writes_during_shutdown.py +++ b/test/cluster/test_unfinished_writes_during_shutdown.py @@ -24,7 +24,6 @@ from cassandra.cluster import ConnectionException, NoHostAvailable # type: igno logger = logging.getLogger(__name__) @pytest.mark.asyncio -@pytest.mark.xfail(reason="SCYLLADB-1842") @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') async def test_unfinished_writes_during_shutdown(request: pytest.FixtureRequest, manager: ManagerClient) -> None: """ Test a simultaneous topology change and write query during shutdown,