diff --git a/repair/repair.cc b/repair/repair.cc index ed5e4e4739..c20e8021f4 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -2429,7 +2429,8 @@ future<> repair::tablet_repair_task_impl::run() { throw abort_requested_exception(); } - co_await utils::get_local_injector().inject("repair_tablet_repair_task_impl_run", utils::wait_for_message(10s)); + co_await utils::get_local_injector().inject("repair_tablet_repair_task_impl_run", + utils::wait_for_message(10s, &rs.get_repair_module().abort_source())); std::unordered_map neighbors; neighbors[m.range] = m.neighbors; diff --git a/service/task_manager_module.cc b/service/task_manager_module.cc index 2f906af190..bd6947157c 100644 --- a/service/task_manager_module.cc +++ b/service/task_manager_module.cc @@ -150,38 +150,50 @@ future> tablet_virtual_task::wait(tasks::task_ tasks::tmlogger.info("tablet_virtual_task: wait until tablet operation is finished"); co_await utils::get_local_injector().inject("tablet_virtual_task_wait", utils::wait_for_message(60s)); - while (true) { - co_await _ss._topology_state_machine.event.wait([&] { - if (!_ss.get_token_metadata().tablets().has_tablet_map(table)) { - return true; - } - auto& tmap = _ss.get_token_metadata().tablets().get_tablet_map(table); - if (is_resize_task(task_type)) { // Resize task. - return tmap.resize_task_info().tablet_task_id.uuid() != id.uuid(); - } else if (tablet_id_opt.has_value()) { // Migration task. - return tmap.get_tablet_info(tablet_id_opt.value()).migration_task_info.tablet_task_id.uuid() != id.uuid(); - } else { // Repair task. - return true; - } - }); - - if (!is_repair_task(task_type)) { - break; - } + auto task_finished = [&] () -> future { auto tmptr = _ss.get_token_metadata_ptr(); - if (!_ss.get_token_metadata().tablets().has_tablet_map(table)) { - break; + if (!tmptr->tablets().has_tablet_map(table)) { + co_return true; } auto& tmap = tmptr->tablets().get_tablet_map(table); - bool repair_still_running = false; - co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& info) { - repair_still_running = repair_still_running || (info.repair_task_info.is_valid() && info.repair_task_info.tablet_task_id.uuid() == id.uuid()); - return make_ready_future(); - }); - if (!repair_still_running) { + if (is_repair_task(task_type)) { + bool running = false; + co_await tmap.for_each_tablet([&] (locator::tablet_id, const locator::tablet_info& info) { + if (info.repair_task_info.is_valid() && info.repair_task_info.tablet_task_id.uuid() == id.uuid()) { + running = true; + } + return make_ready_future(); + }); + co_return !running; + } + if (is_resize_task(task_type)) { + co_return tmap.resize_task_info().tablet_task_id.uuid() != id.uuid(); + } + if (is_migration_task(task_type)) { + co_return tmap.get_tablet_info(tablet_id_opt.value()).migration_task_info.tablet_task_id.uuid() != id.uuid(); + } + on_internal_error(tasks::tmlogger, fmt::format("tablet_virtual_task::wait: unhandled tablet task type {}", task_type)); + }; + + // The repair check (task_finished) is async — for_each_tablet scans + // every tablet — so we cannot use condition_variable::wait(Pred) which + // requires a synchronous predicate. + // + // Register a waiter via event.wait() *before* running the async check + // to avoid missing a broadcast that fires while task_finished() yields. + // If the task is finished we discard the future (the stale waiter is + // cleaned up on the next broadcast or broken()). Otherwise we co_await + // the future, which is guaranteed to capture any broadcast that occurred + // during the check. event.broken() during shutdown propagates + // broken_condition_variable and unblocks the loop promptly. + while (true) { + auto f = _ss._topology_state_machine.event.wait(); + if (co_await task_finished()) { + (void)f.handle_exception([] (auto&&) {}); break; } + co_await std::move(f); } res->status.state = tasks::task_manager::task_state::done; // Failed repair task is retried. diff --git a/test/cluster/tasks/test_tablet_tasks.py b/test/cluster/tasks/test_tablet_tasks.py index c9a82a722b..d27ff29d6c 100644 --- a/test/cluster/tasks/test_tablet_tasks.py +++ b/test/cluster/tasks/test_tablet_tasks.py @@ -623,3 +623,72 @@ async def test_tablet_task_sees_latest_state(manager: ManagerClient): await manager.api.abort_task(servers[0].ip_addr, tablet_task_id) await asyncio.gather(repair_task(), del_repair_task()) + + +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode="release", reason="error injections are not supported in release mode") +async def test_tablet_repair_wait_task_shutdown(manager: ManagerClient): + """Reproducer for SCYLLADB-1532. + + tablet_virtual_task::wait() for repair tasks used a condition variable + predicate that always returned true, so event.wait(pred) never actually + suspended. The while(true) loop busy-spun and event.broken() had no + effect because no real waiter was registered on the CV. During + shutdown, the HTTP server's task gate stayed open, blocking + http_server::stop() until systemd killed the process with SIGABRT. + + We reproduce the hang deterministically: + 1. Pause the repair inside repair_tablet_repair_task_impl_run so it + never completes on its own. + 2. Launch a wait_for_task HTTP call which enters + tablet_virtual_task::wait() and parks in the CV loop. + 3. Ask the server to stop gracefully, WITHOUT releasing the repair + pause. + + Before the fix the busy-spin in wait() blocks http_server::stop(); + graceful shutdown never finishes and the test times out. After the + fix, event.broken() -- fired via the storage_service abort_source + subscription -- wakes the real waiter parked on + _topology_state_machine.event, wait() returns, the HTTP task gate + closes, and the module's abort_source aborts the paused + wait_for_message so the repair task exits cleanly. + """ + module_name = "tablets" + tm = TaskManagerClient(manager.api) + stop_repair_injection = "repair_tablet_repair_task_impl_run" + + servers, _, _, ks, _ = await create_table_insert_data_for_repair(manager) + assert module_name in await tm.list_modules(servers[0].ip_addr) + + # Hold repair so the task stays in 'running' state throughout the test. + # The injection's wait_for_message is wired to the repair module's + # abort_source, so shutdown will release it with an abort exception + # rather than tripping the internal-error timeout path. + await inject_error_on(manager, stop_repair_injection, servers) + await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", "all", await_completion=False) + + repair_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, "user_repair", keyspace=ks) + task = repair_tasks[0] + + log = await manager.server_open_log(servers[0].server_id) + mark = await log.mark() + + # Start wait_for_task which enters tablet_virtual_task::wait(). + # Because repair is paused for the entire test, task_finished() can + # never return true, so the only way out of wait() is via + # event.broken() during shutdown -- which must surface as an + # exception on the client side (connection closed / server error). + # A normal return would indicate a regression (e.g. a stale "done" + # status leaking out during shutdown), so we assert one was raised. + wait_task = asyncio.create_task(tm.wait_for_task(servers[0].ip_addr, task.task_id)) + + # Make sure wait() has entered its loop before we initiate shutdown. + await log.wait_for("tablet_virtual_task: wait until tablet operation is finished", from_mark=mark) + + # Stop the server gracefully with the repair still paused. With the + # fix this completes promptly. Without the fix the busy-spin loop + # holds the HTTP task gate open and graceful shutdown times out. + await manager.server_stop_gracefully(servers[0].server_id, timeout=30) + + with pytest.raises(Exception): + await wait_task