diff --git a/sstables_loader.cc b/sstables_loader.cc index 1e89d2cd93..9d3039e246 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -1162,6 +1162,20 @@ protected: virtual future<> run() override { auto& loader = _loader.local(); co_await loader._ss.local().restore_tablets(_tid, _snap_name, _endpoint, _bucket); + + auto& db = loader._db.local(); + auto s = db.find_schema(_tid); + const auto& topo = db.get_token_metadata().get_topology(); + auto dc = topo.get_datacenter(); + + for (const auto& rack : topo.get_datacenter_racks().at(dc) | std::views::keys) { + auto result = co_await loader._sys_dist_ks.get_snapshot_sstables(_snap_name, s->ks_name(), s->cf_name(), dc, rack); + auto it = std::find_if(result.begin(), result.end(), [] (const auto& ent) { return !ent.downloaded; }); + if (it != result.end()) { + llog.warn("Some replicas failed to download SSTables for {}:{} from {}", s->ks_name(), s->cf_name(), _snap_name); + throw std::runtime_error(format("Failed to download {}", it->toc_name)); + } + } } }; diff --git a/sstables_loader_helpers.cc b/sstables_loader_helpers.cc index 72e4680b57..9aa97e4004 100644 --- a/sstables_loader_helpers.cc +++ b/sstables_loader_helpers.cc @@ -14,12 +14,15 @@ #include "replica/database.hh" #include "sstables/shared_sstable.hh" #include "sstables/sstables.hh" +#include "utils/error_injection.hh" future download_sstable(replica::database& db, replica::table& table, sstables::shared_sstable sstable, logging::logger& logger) { constexpr auto foptions = file_open_options{.extent_allocation_size_hint = 32_MiB, .sloppy_size = true}; constexpr auto stream_options = file_output_stream_options{.buffer_size = 128_KiB, .write_behind = 10}; auto components = sstable->all_components(); + utils::get_local_injector().inject("fail_download_sstable", [] { throw std::runtime_error("Failing sstable download"); }); + // Move the TOC to the front to be processed first since `sstables::create_stream_sink` takes care // of creating behind the scene TemporaryTOC instead of usual one. This assures that in case of failure // this partially created SSTable will be cleaned up properly at some point. diff --git a/test/cluster/object_store/test_backup.py b/test/cluster/object_store/test_backup.py index 68749262da..4351be10b2 100644 --- a/test/cluster/object_store/test_backup.py +++ b/test/cluster/object_store/test_backup.py @@ -778,6 +778,40 @@ async def test_restore_tablets(build_mode: str, manager: ManagerClient, object_s await check_mutation_replicas(cql, manager, servers, range(num_keys), topology, logger, ks, 'test') +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_restore_tablets_download_failure(build_mode: str, manager: ManagerClient, object_storage): + '''Check that failure to download an sstable propagates back to API''' + + topology = topo(rf = 1, nodes = 2, racks = 1, dcs = 1) + servers, host_ids = await create_cluster(topology, manager, logger, object_storage) + + await manager.disable_tablet_balancing() + cql = manager.get_cql() + + num_keys = 12 + tablet_count=4 + + async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {topology.rf}}}") as ks: + await cql.run_async(create_schema(ks, 'test', min_tablet_count=tablet_count)) + insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, value) VALUES (?, ?)") + insert_stmt.consistency_level = ConsistencyLevel.ALL + await asyncio.gather(*(cql.run_async(insert_stmt, (str(i), i)) for i in range(num_keys))) + snap_name, sstables = await take_snapshot(ks, servers, manager, logger) + await asyncio.gather(*(do_backup(s, snap_name, f'{s.server_id}/{snap_name}', ks, 'test', object_storage, manager, logger) for s in servers)) + + await manager.api.enable_injection(servers[1].ip_addr, "fail_download_sstable", one_shot=True) + async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {topology.rf}}}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test ( pk text primary key, value int ) WITH tablets = {{'min_tablet_count': {tablet_count}, 'max_tablet_count': {tablet_count}}};") + + logger.info(f'Restore cluster via {servers[0].ip_addr}') + manifests = [ f'{s.server_id}/{snap_name}/manifest.json' for s in servers ] + tid = await manager.api.restore_tablets(servers[0].ip_addr, ks, 'test', snap_name, servers[0].datacenter, object_storage.address, object_storage.bucket_name, manifests) + status = await manager.api.wait_task(servers[0].ip_addr, tid) + assert 'state' in status and status['state'] == 'failed' + assert 'error' in status and 'Failed to download' in status['error'] + + @pytest.mark.asyncio async def test_restore_with_non_existing_sstable(manager: ManagerClient, object_storage): '''Check that restore task fails well when given a non-existing sstable'''