mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-03 05:26:58 +00:00
storage_service: cancel write handlers during drain to prevent shutdown deadlock
When a node shuts down, do_drain() calls stop_transport() which tears
down the messaging service. After this point, MUTATION_DONE responses
from replicas can no longer reach the coordinator, so any in-flight
write_response_handlers will never complete naturally. These handlers
hold ERMs referencing stale token_metadata versions.
If the topology coordinator calls barrier_and_drain (either on itself
or via RPC), it blocks in stale_versions_in_use() waiting for these
stale versions to be released. This causes:
- On the coordinator node: do_drain -> wait_for_group0_stop deadlock
(the topology coordinator fiber is stuck in barrier_and_drain).
- On non-coordinator nodes: ss::stop -> uninit_messaging_service
deadlock (the barrier_and_drain RPC handler holds the gate open).
Fix: cancel all write response handlers on all shards right after
stop_transport() in do_drain(). This releases their ERMs and the
associated stale token_metadata versions, unblocking
stale_versions_in_use().
Heap-allocate _write_handlers_gate and add an allow_new parameter to
cancel_all_write_response_handlers(). When allow_new=true (used by
do_drain), the gate is closed and swapped with a fresh one — existing
handlers are waited on while new handlers can still be created. This
avoids blocking internal writes (paxos learn, compaction history
updates) that still need to create handlers during the remainder of
the drain sequence. When allow_new=false (used by drain_on_shutdown),
the gate is closed permanently — no new handlers can be created after
final shutdown.
Update test_lwt_shutdown to wait for 'Stop transport: done' instead
of 'Shutting down storage proxy RPC verbs'. The latter message is
now only logged after do_drain() completes, but do_drain() blocks
in cancel_all_write_response_handlers() waiting for the background
paxos learn handler — which is exactly what the test needs to release
before shutdown can proceed.
Fixes: SCYLLADB-2163
Refs: scylladb/scylladb#23665
(cherry picked from commit 2927f0dd21)
This commit is contained in:
@@ -1753,7 +1753,7 @@ public:
|
||||
register_cancellable();
|
||||
}
|
||||
|
||||
attach_to(_proxy->_write_handlers_gate);
|
||||
attach_to(*_proxy->_write_handlers_gate);
|
||||
}
|
||||
void attach_to(gate& g) {
|
||||
_holders.push_back(g.hold());
|
||||
@@ -7474,7 +7474,7 @@ void storage_proxy::on_down(const gms::inet_address& endpoint, locator::host_id
|
||||
};
|
||||
|
||||
future<> storage_proxy::drain_on_shutdown() {
|
||||
co_await cancel_all_write_response_handlers();
|
||||
co_await cancel_all_write_response_handlers(false);
|
||||
co_await _hints_resource_manager.stop();
|
||||
}
|
||||
|
||||
@@ -7498,12 +7498,23 @@ future<utils::chunked_vector<dht::token_range_endpoints>> storage_proxy::describ
|
||||
return locator::describe_ring(_db.local(), _remote->gossiper(), keyspace, include_only_local_dc);
|
||||
}
|
||||
|
||||
future<> storage_proxy::cancel_all_write_response_handlers() {
|
||||
auto f = _write_handlers_gate.close();
|
||||
future<> storage_proxy::cancel_all_write_response_handlers(bool allow_new) {
|
||||
// Cancel all existing handlers. They are attached to the current gate.
|
||||
while (!_response_handlers.empty()) {
|
||||
_response_handlers.begin()->second->timeout_cb();
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
// Close the gate and wait for destruction of handlers still alive via
|
||||
// external shared_ptr refs (e.g. background futures in send_to_live_endpoints).
|
||||
auto f = _write_handlers_gate->close();
|
||||
std::unique_ptr<gate> old_gate;
|
||||
if (allow_new) {
|
||||
// Swap in a fresh open gate so new handlers can be created.
|
||||
// Keep the old gate alive (via old_gate) until the close future resolves.
|
||||
old_gate = std::exchange(_write_handlers_gate, std::make_unique<gate>());
|
||||
}
|
||||
// If !allow_new, the closed gate stays in _write_handlers_gate — any
|
||||
// subsequent handler creation will get gate_closed_exception.
|
||||
co_await std::move(f);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -322,7 +322,7 @@ private:
|
||||
class cancellable_write_handlers_list;
|
||||
std::unique_ptr<cancellable_write_handlers_list> _cancellable_write_handlers_list;
|
||||
|
||||
gate _write_handlers_gate;
|
||||
std::unique_ptr<gate> _write_handlers_gate = std::make_unique<gate>();
|
||||
|
||||
/* This is a pointer to the shard-local part of the sharded cdc_service:
|
||||
* storage_proxy needs access to cdc_service to augment mutations.
|
||||
@@ -572,7 +572,7 @@ public:
|
||||
bool is_me(gms::inet_address addr) const noexcept;
|
||||
bool is_me(const locator::effective_replication_map& erm, locator::host_id id) const noexcept;
|
||||
|
||||
future<> cancel_all_write_response_handlers();
|
||||
future<> cancel_all_write_response_handlers(bool allow_new);
|
||||
|
||||
private:
|
||||
bool only_me(const locator::effective_replication_map& erm, const host_id_vector_replica_set& replicas) const noexcept;
|
||||
|
||||
@@ -2971,6 +2971,16 @@ future<> storage_service::do_drain() {
|
||||
// Need to stop transport before group0, otherwise RPCs may fail with raft_group_not_found.
|
||||
co_await stop_transport();
|
||||
|
||||
// Cancel write handlers on all shards so they release their ERMs
|
||||
// (and the associated token_metadata versions). Without this,
|
||||
// wait_for_group0_stop below may deadlock: the topology coordinator
|
||||
// fiber calls barrier_and_drain which blocks in stale_versions_in_use()
|
||||
// waiting for stale token_metadata versions held by write handlers
|
||||
// whose MUTATION_DONE responses can no longer arrive (transport is stopped).
|
||||
co_await _qp.proxy().container().invoke_on_all([] (storage_proxy& sp) {
|
||||
return sp.cancel_all_write_response_handlers(true);
|
||||
});
|
||||
|
||||
// Drain view builder before group0, because the view builder uses group0 to coordinate view building.
|
||||
// Drain after transport is stopped, because view_builder::drain aborts view writes for user writes as well.
|
||||
co_await _view_builder.invoke_on_all(&db::view::view_builder::drain);
|
||||
|
||||
@@ -840,7 +840,11 @@ async def test_lwt_shutdown(manager: ManagerClient):
|
||||
|
||||
logger.info("Start node shutdown")
|
||||
stop_task = asyncio.create_task(manager.server_stop_gracefully(s0.server_id))
|
||||
await log.wait_for('Shutting down storage proxy RPC verbs')
|
||||
# Wait for stop_transport to complete. After this, do_drain() will
|
||||
# block in cancel_all_write_response_handlers() waiting for the
|
||||
# paxos learn's background write handler to be released. The paxos
|
||||
# store is still alive at this point.
|
||||
await log.wait_for('Stop transport: done')
|
||||
assert len(await log.grep('Shutting down paxos store')) == 0
|
||||
|
||||
logger.info("Trigger paxos_state_learn_after_mutate")
|
||||
|
||||
@@ -24,7 +24,6 @@ from cassandra.cluster import ConnectionException, NoHostAvailable # type: igno
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.xfail(reason="SCYLLADB-1842")
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_unfinished_writes_during_shutdown(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
||||
""" Test a simultaneous topology change and write query during shutdown,
|
||||
|
||||
Reference in New Issue
Block a user