diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 774ea1dec2..e43d61de8e 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1753,7 +1753,7 @@ public: register_cancellable(); } - attach_to(_proxy->_write_handlers_gate); + attach_to(*_proxy->_write_handlers_gate); } void attach_to(gate& g) { _holders.push_back(g.hold()); @@ -7474,7 +7474,7 @@ void storage_proxy::on_down(const gms::inet_address& endpoint, locator::host_id }; future<> storage_proxy::drain_on_shutdown() { - co_await cancel_all_write_response_handlers(); + co_await cancel_all_write_response_handlers(false); co_await _hints_resource_manager.stop(); } @@ -7498,12 +7498,23 @@ future> storage_proxy::describ return locator::describe_ring(_db.local(), _remote->gossiper(), keyspace, include_only_local_dc); } -future<> storage_proxy::cancel_all_write_response_handlers() { - auto f = _write_handlers_gate.close(); +future<> storage_proxy::cancel_all_write_response_handlers(bool allow_new) { + // Cancel all existing handlers. They are attached to the current gate. while (!_response_handlers.empty()) { _response_handlers.begin()->second->timeout_cb(); co_await coroutine::maybe_yield(); } + // Close the gate and wait for destruction of handlers still alive via + // external shared_ptr refs (e.g. background futures in send_to_live_endpoints). + auto f = _write_handlers_gate->close(); + std::unique_ptr old_gate; + if (allow_new) { + // Swap in a fresh open gate so new handlers can be created. + // Keep the old gate alive (via old_gate) until the close future resolves. + old_gate = std::exchange(_write_handlers_gate, std::make_unique()); + } + // If !allow_new, the closed gate stays in _write_handlers_gate — any + // subsequent handler creation will get gate_closed_exception. co_await std::move(f); } } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index f762f5dc0d..731354e974 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -322,7 +322,7 @@ private: class cancellable_write_handlers_list; std::unique_ptr _cancellable_write_handlers_list; - gate _write_handlers_gate; + std::unique_ptr _write_handlers_gate = std::make_unique(); /* This is a pointer to the shard-local part of the sharded cdc_service: * storage_proxy needs access to cdc_service to augment mutations. @@ -572,7 +572,7 @@ public: bool is_me(gms::inet_address addr) const noexcept; bool is_me(const locator::effective_replication_map& erm, locator::host_id id) const noexcept; - future<> cancel_all_write_response_handlers(); + future<> cancel_all_write_response_handlers(bool allow_new); private: bool only_me(const locator::effective_replication_map& erm, const host_id_vector_replica_set& replicas) const noexcept; diff --git a/service/storage_service.cc b/service/storage_service.cc index 14815ff3fa..3ba31499f6 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2971,6 +2971,16 @@ future<> storage_service::do_drain() { // Need to stop transport before group0, otherwise RPCs may fail with raft_group_not_found. co_await stop_transport(); + // Cancel write handlers on all shards so they release their ERMs + // (and the associated token_metadata versions). Without this, + // wait_for_group0_stop below may deadlock: the topology coordinator + // fiber calls barrier_and_drain which blocks in stale_versions_in_use() + // waiting for stale token_metadata versions held by write handlers + // whose MUTATION_DONE responses can no longer arrive (transport is stopped). + co_await _qp.proxy().container().invoke_on_all([] (storage_proxy& sp) { + return sp.cancel_all_write_response_handlers(true); + }); + // Drain view builder before group0, because the view builder uses group0 to coordinate view building. // Drain after transport is stopped, because view_builder::drain aborts view writes for user writes as well. co_await _view_builder.invoke_on_all(&db::view::view_builder::drain); diff --git a/test/cluster/test_tablets_lwt.py b/test/cluster/test_tablets_lwt.py index 95418a6104..cf2c165fb4 100644 --- a/test/cluster/test_tablets_lwt.py +++ b/test/cluster/test_tablets_lwt.py @@ -840,7 +840,11 @@ async def test_lwt_shutdown(manager: ManagerClient): logger.info("Start node shutdown") stop_task = asyncio.create_task(manager.server_stop_gracefully(s0.server_id)) - await log.wait_for('Shutting down storage proxy RPC verbs') + # Wait for stop_transport to complete. After this, do_drain() will + # block in cancel_all_write_response_handlers() waiting for the + # paxos learn's background write handler to be released. The paxos + # store is still alive at this point. + await log.wait_for('Stop transport: done') assert len(await log.grep('Shutting down paxos store')) == 0 logger.info("Trigger paxos_state_learn_after_mutate") diff --git a/test/cluster/test_unfinished_writes_during_shutdown.py b/test/cluster/test_unfinished_writes_during_shutdown.py index 00f21b8956..36819bfd3c 100644 --- a/test/cluster/test_unfinished_writes_during_shutdown.py +++ b/test/cluster/test_unfinished_writes_during_shutdown.py @@ -24,7 +24,6 @@ from cassandra.cluster import ConnectionException, NoHostAvailable # type: igno logger = logging.getLogger(__name__) @pytest.mark.asyncio -@pytest.mark.xfail(reason="SCYLLADB-1842") @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') async def test_unfinished_writes_during_shutdown(request: pytest.FixtureRequest, manager: ManagerClient) -> None: """ Test a simultaneous topology change and write query during shutdown,