diff --git a/service/task_manager_module.cc b/service/task_manager_module.cc index 996846f622..a7ff4503c8 100644 --- a/service/task_manager_module.cc +++ b/service/task_manager_module.cc @@ -94,6 +94,7 @@ future> tablet_virtual_task::wait(tasks::task_ co_return std::nullopt; } + tasks::tmlogger.info("tablet_virtual_task: wait until tablet operation is finished"); co_await _ss._topology_state_machine.event.wait([&] { auto& tmap = _ss.get_token_metadata().tablets().get_tablet_map(table); if (tablet_id_opt.has_value()) { diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index b21923fbf9..315b03873e 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -1404,14 +1404,15 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // The state "streaming" is needed to ensure that stale stream_tablet() RPC doesn't // get admitted before global_tablet_token_metadata_barrier() is finished for earlier // stage in case of coordinator failover. - case locator::tablet_transition_stage::streaming: + case locator::tablet_transition_stage::streaming: { if (drain) { utils::get_local_injector().inject("stream_tablet_fail_on_drain", [] { throw std::runtime_error("stream_tablet failed due to error injection"); }); } if (action_failed(tablet_state.streaming)) { - if (check_excluded_replicas()) { + bool cleanup = utils::get_local_injector().enter("stream_tablet_move_to_cleanup"); + if (cleanup || check_excluded_replicas()) { if (do_barrier()) { rtlogger.debug("Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::cleanup_target); updates.emplace_back(get_mutation_builder() @@ -1423,7 +1424,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } } - if (advance_in_background(gid, tablet_state.streaming, "streaming", [&] { + bool wait = utils::get_local_injector().enter("stream_tablet_wait"); + if (!wait && advance_in_background(gid, tablet_state.streaming, "streaming", [&] { + utils::get_local_injector().inject("stream_tablet_move_to_cleanup", + [] { throw std::runtime_error("stream_tablet failed due to error injection"); }); + if (!trinfo.pending_replica) { rtlogger.info("Skipped tablet streaming ({}) of {} as no pending replica found", trinfo.transition, gid); return make_ready_future<>(); @@ -1439,6 +1444,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { .del_session(last_token) .build()); } + } break; case locator::tablet_transition_stage::write_both_read_new: { utils::get_local_injector().inject("crash-in-tablet-write-both-read-new", [] { diff --git a/test/topology_tasks/test_tablet_tasks.py b/test/topology_tasks/test_tablet_tasks.py index 991cc6f51e..3636bfc584 100644 --- a/test/topology_tasks/test_tablet_tasks.py +++ b/test/topology_tasks/test_tablet_tasks.py @@ -239,3 +239,44 @@ async def test_tablet_migration_task_list(manager: ManagerClient): migration_dst = (host_ids[1], 0) await enable_injection(manager, servers, injection) await asyncio.gather(move_tablet(servers[0], migration_src, migration_dst), check_migration_task_list("migration")) + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_tablet_migration_task_failed(manager: ManagerClient): + module_name = "tablets" + tm = TaskManagerClient(manager.api) + servers, host_ids = await prepare_migration_test(manager) + + wait_injection = "stream_tablet_wait" + throw_injection = "stream_tablet_move_to_cleanup" + + async def move_tablet(old_replica, new_replica): + await manager.api.move_tablet(servers[0].ip_addr, "test", "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0) + + async def wait_for_task(task_id, type): + status = await tm.wait_for_task(servers[0].ip_addr, task_id) + check_task_status(status, ["failed"], type, "tablet", False) + + async def resume_migration(log, mark): + await log.wait_for('tablet_virtual_task: wait until tablet operation is finished', from_mark=mark) + await disable_injection(manager, servers, wait_injection) + + async def check(type, log, mark): + # Wait until migration task is created. + migration_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, type) + assert len(migration_tasks) == 1 + + await asyncio.gather(wait_for_task(migration_tasks[0].task_id, type), resume_migration(log, mark)) + + await enable_injection(manager, servers, wait_injection) + await enable_injection(manager, servers, throw_injection) + + log = await manager.server_open_log(servers[0].server_id) + mark = await log.mark() + + replicas = await get_all_tablet_replicas(manager, servers[0], 'test', 'test') + assert len(replicas) == 1 and len(replicas[0].replicas) == 1 + + src = replicas[0].replicas[0] + dst = (src[0], 1 - src[1]) + await asyncio.gather(move_tablet(src, dst), check("intranode_migration", log, mark))