diff --git a/sstables_loader.cc b/sstables_loader.cc index 0c7adf47b5..351db04709 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -986,6 +986,8 @@ future<> sstables_loader::download_tablet_sstables(locator::global_tablet_id tid throw std::logic_error("sstables_partially_contained"); } llog.debug("{} SSTables filtered by range {} for tablet {}", fully.size(), tablet_range, tid); + co_await utils::get_local_injector().inject("pause_tablet_restore", utils::wait_for_message(60s)); + if (fully.empty()) { // It can happen that a tablet exists and contains no data. Just skip it co_return; diff --git a/test/cluster/object_store/test_backup.py b/test/cluster/object_store/test_backup.py index b76b5ee9ae..ba0eb31c3f 100644 --- a/test/cluster/object_store/test_backup.py +++ b/test/cluster/object_store/test_backup.py @@ -7,6 +7,7 @@ import asyncio import subprocess import tempfile import itertools +import aiohttp import pytest import time @@ -810,6 +811,58 @@ async def test_restore_tablets_download_failure(build_mode: str, manager: Manage assert 'error' in status and 'Failed to download' in status['error'] +@pytest.mark.asyncio +@pytest.mark.parametrize("target", ['coordinator', 'replica', 'api']) +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_restore_tablets_node_loss_resiliency(build_mode: str, manager: ManagerClient, object_storage, target): + '''Check how restore handler node loss in the middle of operation''' + + topology = topo(rf = 2, nodes = 4, racks = 2, dcs = 1) + servers, host_ids = await create_cluster(topology, manager, logger, object_storage) + log = await manager.server_open_log(servers[0].server_id) + await log.wait_for("raft_topology - start topology coordinator fiber", timeout=10) + + await manager.disable_tablet_balancing() + cql = manager.get_cql() + + num_keys = 24 + tablet_count=8 + + async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {topology.rf}}}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test ( pk text primary key, value int ) WITH tablets = {{'min_tablet_count': {tablet_count}}};") + insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, value) VALUES (?, ?)") + insert_stmt.consistency_level = ConsistencyLevel.ALL + await asyncio.gather(*(cql.run_async(insert_stmt, (str(i), i)) for i in range(num_keys))) + snap_name, sstables = await take_snapshot(ks, servers, manager, logger) + await asyncio.gather(*(do_backup(s, snap_name, f'{s.server_id}/{snap_name}', ks, 'test', object_storage, manager, logger) for s in servers)) + + async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {topology.rf}}}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test ( pk text primary key, value int ) WITH tablets = {{'min_tablet_count': {tablet_count}, 'max_tablet_count': {tablet_count}}};") + + await manager.api.enable_injection(servers[2].ip_addr, "pause_tablet_restore", one_shot=True) + log = await manager.server_open_log(servers[2].server_id) + mark = await log.mark() + + manifests = [ f'{s.server_id}/{snap_name}/manifest.json' for s in servers ] + tid = await manager.api.restore_tablets(servers[1].ip_addr, ks, 'test', snap_name, servers[0].datacenter, object_storage.address, object_storage.bucket_name, manifests) + await log.wait_for("pause_tablet_restore: waiting for message", from_mark=mark) + + if target == 'api': + await manager.server_stop(servers[1].server_id) + with pytest.raises(aiohttp.client_exceptions.ClientConnectorError): + await manager.api.wait_task(servers[1].ip_addr, tid) + else: + if target == 'coordinator': + await manager.server_stop(servers[0].server_id) + await manager.api.message_injection(servers[2].ip_addr, "pause_tablet_restore") + elif target == 'replica': + await manager.server_stop(servers[2].server_id) + + # Sometimes killing nodes manage to restore tablets before being killed + # So the best thing to do is to make sure restore task finishes at all + await asyncio.wait_for(manager.api.wait_task(servers[1].ip_addr, tid), timeout=60) + + @pytest.mark.asyncio async def test_restore_with_non_existing_sstable(manager: ManagerClient, object_storage): '''Check that restore task fails well when given a non-existing sstable'''