storage_proxy: only cancel write handlers with pending remote targets during drain

The previous fix (cancel_all_write_response_handlers in do_drain)
was too aggressive — it killed all handlers including ones used by
group0 for raft commits. Since group0 is still running at that point
(before wait_for_group0_stop), this caused group0 operations to fail
(SCYLLADB-2168).

The actual problem is only with handlers that have pending remote
targets: after stop_transport() their MUTATION_DONE responses can
never arrive via messaging. Handlers whose only pending targets are
local can still complete via apply_locally and should be left alone.

Add cancel_nonlocal_write_response_handlers() which checks each
handler's remaining targets against the local host ID. Only handlers
with at least one remote pending target are cancelled. Use it in
do_drain instead of cancel_all_write_response_handlers. The latter
remains unchanged for drain_on_shutdown (final proxy shutdown where
all handlers must be killed).

Fixes: SCYLLADB-2168

Closes scylladb/scylladb#30020
This commit is contained in:
Petr Gusev
2026-05-22 13:28:49 +02:00
committed by Patryk Jędrzejczak
parent 6bffc0d2e0
commit 954426407e
3 changed files with 43 additions and 20 deletions

View File

@@ -1753,7 +1753,7 @@ public:
register_cancellable();
}
attach_to(*_proxy->_write_handlers_gate);
attach_to(_proxy->_write_handlers_gate);
}
void attach_to(gate& g) {
_holders.push_back(g.hold());
@@ -7477,7 +7477,7 @@ void storage_proxy::on_down(const gms::inet_address& endpoint, locator::host_id
};
future<> storage_proxy::drain_on_shutdown() {
co_await cancel_all_write_response_handlers(false);
co_await cancel_all_write_response_handlers();
co_await _hints_resource_manager.stop();
}
@@ -7501,23 +7501,42 @@ future<utils::chunked_vector<dht::token_range_endpoints>> storage_proxy::describ
return locator::describe_ring(_db.local(), _remote->gossiper(), keyspace, include_only_local_dc);
}
future<> storage_proxy::cancel_all_write_response_handlers(bool allow_new) {
// Cancel all existing handlers. They are attached to the current gate.
future<> storage_proxy::cancel_all_write_response_handlers() {
auto f = _write_handlers_gate.close();
while (!_response_handlers.empty()) {
_response_handlers.begin()->second->timeout_cb();
co_await coroutine::maybe_yield();
}
// Close the gate and wait for destruction of handlers still alive via
// external shared_ptr refs (e.g. background futures in send_to_live_endpoints).
auto f = _write_handlers_gate->close();
std::unique_ptr<gate> old_gate;
if (allow_new) {
// Swap in a fresh open gate so new handlers can be created.
// Keep the old gate alive (via old_gate) until the close future resolves.
old_gate = std::exchange(_write_handlers_gate, std::make_unique<gate>());
}
// If !allow_new, the closed gate stays in _write_handlers_gate — any
// subsequent handler creation will get gate_closed_exception.
co_await std::move(f);
}
future<> storage_proxy::cancel_nonlocal_write_response_handlers() {
// Cancel handlers that have pending remote targets. After
// stop_transport(), MUTATION_DONE responses from remote nodes can
// no longer arrive, so these handlers are stuck. Handlers whose
// only pending targets are local can still complete via
// apply_locally and are left alone.
gate g;
// Collect IDs first to avoid iterator invalidation during cancellation.
// No yield here — the loop body is trivial (integer comparison + push_back).
std::vector<response_id_type> to_cancel;
to_cancel.reserve(_response_handlers.size());
for (auto& [id, handler] : _response_handlers) {
auto dominated_by_local = std::ranges::all_of(handler->get_targets(), [&] (locator::host_id target) {
return is_me(*handler->_effective_replication_map_ptr, target);
});
if (!dominated_by_local) {
to_cancel.push_back(id);
}
}
for (auto id : to_cancel) {
auto it = _response_handlers.find(id);
if (it != _response_handlers.end()) {
it->second->attach_to(g);
it->second->timeout_cb();
}
co_await coroutine::maybe_yield();
}
co_await g.close();
}
}

View File

@@ -327,7 +327,7 @@ private:
class cancellable_write_handlers_list;
std::unique_ptr<cancellable_write_handlers_list> _cancellable_write_handlers_list;
std::unique_ptr<gate> _write_handlers_gate = std::make_unique<gate>();
gate _write_handlers_gate;
/* This is a pointer to the shard-local part of the sharded cdc_service:
* storage_proxy needs access to cdc_service to augment mutations.
@@ -577,7 +577,8 @@ public:
bool is_me(gms::inet_address addr) const noexcept;
bool is_me(const locator::effective_replication_map& erm, locator::host_id id) const noexcept;
future<> cancel_all_write_response_handlers(bool allow_new);
future<> cancel_all_write_response_handlers();
future<> cancel_nonlocal_write_response_handlers();
private:
bool only_me(const locator::effective_replication_map& erm, const host_id_vector_replica_set& replicas) const noexcept;

View File

@@ -2970,14 +2970,17 @@ future<> storage_service::do_drain() {
// Need to stop transport before group0, otherwise RPCs may fail with raft_group_not_found.
co_await stop_transport();
// Cancel write handlers on all shards so they release their ERMs
// (and the associated token_metadata versions). Without this,
// Cancel write handlers that have pending remote targets so they release
// their ERMs (and the associated token_metadata versions). Without this,
// wait_for_group0_stop below may deadlock: the topology coordinator
// fiber calls barrier_and_drain which blocks in stale_versions_in_use()
// waiting for stale token_metadata versions held by write handlers
// whose MUTATION_DONE responses can no longer arrive (transport is stopped).
// Handlers with only local pending targets are left alone — they can
// still complete via apply_locally, and group0 needs them for raft commits
// until wait_for_group0_stop below.
co_await _qp.proxy().container().invoke_on_all([] (storage_proxy& sp) {
return sp.cancel_all_write_response_handlers(true);
return sp.cancel_nonlocal_write_response_handlers();
});
// Drain view builder before group0, because the view builder uses group0 to coordinate view building.