test: add tests to check the failed migration virtual tasks

This commit is contained in:
Aleksandra Martyniuk
2024-12-04 17:05:05 +01:00
parent be8dfd220f
commit bc17535427
3 changed files with 51 additions and 3 deletions

View File

@@ -94,6 +94,7 @@ future<std::optional<tasks::task_status>> tablet_virtual_task::wait(tasks::task_
co_return std::nullopt;
}
tasks::tmlogger.info("tablet_virtual_task: wait until tablet operation is finished");
co_await _ss._topology_state_machine.event.wait([&] {
auto& tmap = _ss.get_token_metadata().tablets().get_tablet_map(table);
if (tablet_id_opt.has_value()) {

View File

@@ -1404,14 +1404,15 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// The state "streaming" is needed to ensure that stale stream_tablet() RPC doesn't
// get admitted before global_tablet_token_metadata_barrier() is finished for earlier
// stage in case of coordinator failover.
case locator::tablet_transition_stage::streaming:
case locator::tablet_transition_stage::streaming: {
if (drain) {
utils::get_local_injector().inject("stream_tablet_fail_on_drain",
[] { throw std::runtime_error("stream_tablet failed due to error injection"); });
}
if (action_failed(tablet_state.streaming)) {
if (check_excluded_replicas()) {
bool cleanup = utils::get_local_injector().enter("stream_tablet_move_to_cleanup");
if (cleanup || check_excluded_replicas()) {
if (do_barrier()) {
rtlogger.debug("Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::cleanup_target);
updates.emplace_back(get_mutation_builder()
@@ -1423,7 +1424,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
}
if (advance_in_background(gid, tablet_state.streaming, "streaming", [&] {
bool wait = utils::get_local_injector().enter("stream_tablet_wait");
if (!wait && advance_in_background(gid, tablet_state.streaming, "streaming", [&] {
utils::get_local_injector().inject("stream_tablet_move_to_cleanup",
[] { throw std::runtime_error("stream_tablet failed due to error injection"); });
if (!trinfo.pending_replica) {
rtlogger.info("Skipped tablet streaming ({}) of {} as no pending replica found", trinfo.transition, gid);
return make_ready_future<>();
@@ -1439,6 +1444,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.del_session(last_token)
.build());
}
}
break;
case locator::tablet_transition_stage::write_both_read_new: {
utils::get_local_injector().inject("crash-in-tablet-write-both-read-new", [] {

View File

@@ -239,3 +239,44 @@ async def test_tablet_migration_task_list(manager: ManagerClient):
migration_dst = (host_ids[1], 0)
await enable_injection(manager, servers, injection)
await asyncio.gather(move_tablet(servers[0], migration_src, migration_dst), check_migration_task_list("migration"))
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_migration_task_failed(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
servers, host_ids = await prepare_migration_test(manager)
wait_injection = "stream_tablet_wait"
throw_injection = "stream_tablet_move_to_cleanup"
async def move_tablet(old_replica, new_replica):
await manager.api.move_tablet(servers[0].ip_addr, "test", "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0)
async def wait_for_task(task_id, type):
status = await tm.wait_for_task(servers[0].ip_addr, task_id)
check_task_status(status, ["failed"], type, "tablet", False)
async def resume_migration(log, mark):
await log.wait_for('tablet_virtual_task: wait until tablet operation is finished', from_mark=mark)
await disable_injection(manager, servers, wait_injection)
async def check(type, log, mark):
# Wait until migration task is created.
migration_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, type)
assert len(migration_tasks) == 1
await asyncio.gather(wait_for_task(migration_tasks[0].task_id, type), resume_migration(log, mark))
await enable_injection(manager, servers, wait_injection)
await enable_injection(manager, servers, throw_injection)
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
replicas = await get_all_tablet_replicas(manager, servers[0], 'test', 'test')
assert len(replicas) == 1 and len(replicas[0].replicas) == 1
src = replicas[0].replicas[0]
dst = (src[0], 1 - src[1])
await asyncio.gather(move_tablet(src, dst), check("intranode_migration", log, mark))