From f85c5184f34fd5bc4e3670b30a656d8db5fd50c0 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 26 Mar 2026 15:42:19 +0300 Subject: [PATCH] sstables_loader: Fail tablet-restore task if not all sstables were downloaded When the storage_service::restore_tablets() resolves, it only means that tablet transitions are done, including restore transitions, but not necessarily that they succeeded. So before resolving the restoration task with success need to check if all sstables were downloaded and, if not, resolve the task with exception. Test included. It uses fault-injection to abort downloading of a single sstable early, then checks that the error was properly propagated back to the task waiting API Signed-off-by: Pavel Emelyanov --- sstables_loader.cc | 14 ++++++++++ sstables_loader_helpers.cc | 3 +++ test/cluster/object_store/test_backup.py | 34 ++++++++++++++++++++++++ 3 files changed, 51 insertions(+) diff --git a/sstables_loader.cc b/sstables_loader.cc index 1e89d2cd93..9d3039e246 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -1162,6 +1162,20 @@ protected: virtual future<> run() override { auto& loader = _loader.local(); co_await loader._ss.local().restore_tablets(_tid, _snap_name, _endpoint, _bucket); + + auto& db = loader._db.local(); + auto s = db.find_schema(_tid); + const auto& topo = db.get_token_metadata().get_topology(); + auto dc = topo.get_datacenter(); + + for (const auto& rack : topo.get_datacenter_racks().at(dc) | std::views::keys) { + auto result = co_await loader._sys_dist_ks.get_snapshot_sstables(_snap_name, s->ks_name(), s->cf_name(), dc, rack); + auto it = std::find_if(result.begin(), result.end(), [] (const auto& ent) { return !ent.downloaded; }); + if (it != result.end()) { + llog.warn("Some replicas failed to download SSTables for {}:{} from {}", s->ks_name(), s->cf_name(), _snap_name); + throw std::runtime_error(format("Failed to download {}", it->toc_name)); + } + } } }; diff --git a/sstables_loader_helpers.cc b/sstables_loader_helpers.cc index 72e4680b57..9aa97e4004 100644 --- a/sstables_loader_helpers.cc +++ b/sstables_loader_helpers.cc @@ -14,12 +14,15 @@ #include "replica/database.hh" #include "sstables/shared_sstable.hh" #include "sstables/sstables.hh" +#include "utils/error_injection.hh" future download_sstable(replica::database& db, replica::table& table, sstables::shared_sstable sstable, logging::logger& logger) { constexpr auto foptions = file_open_options{.extent_allocation_size_hint = 32_MiB, .sloppy_size = true}; constexpr auto stream_options = file_output_stream_options{.buffer_size = 128_KiB, .write_behind = 10}; auto components = sstable->all_components(); + utils::get_local_injector().inject("fail_download_sstable", [] { throw std::runtime_error("Failing sstable download"); }); + // Move the TOC to the front to be processed first since `sstables::create_stream_sink` takes care // of creating behind the scene TemporaryTOC instead of usual one. This assures that in case of failure // this partially created SSTable will be cleaned up properly at some point. diff --git a/test/cluster/object_store/test_backup.py b/test/cluster/object_store/test_backup.py index 68749262da..4351be10b2 100644 --- a/test/cluster/object_store/test_backup.py +++ b/test/cluster/object_store/test_backup.py @@ -778,6 +778,40 @@ async def test_restore_tablets(build_mode: str, manager: ManagerClient, object_s await check_mutation_replicas(cql, manager, servers, range(num_keys), topology, logger, ks, 'test') +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_restore_tablets_download_failure(build_mode: str, manager: ManagerClient, object_storage): + '''Check that failure to download an sstable propagates back to API''' + + topology = topo(rf = 1, nodes = 2, racks = 1, dcs = 1) + servers, host_ids = await create_cluster(topology, manager, logger, object_storage) + + await manager.disable_tablet_balancing() + cql = manager.get_cql() + + num_keys = 12 + tablet_count=4 + + async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {topology.rf}}}") as ks: + await cql.run_async(create_schema(ks, 'test', min_tablet_count=tablet_count)) + insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, value) VALUES (?, ?)") + insert_stmt.consistency_level = ConsistencyLevel.ALL + await asyncio.gather(*(cql.run_async(insert_stmt, (str(i), i)) for i in range(num_keys))) + snap_name, sstables = await take_snapshot(ks, servers, manager, logger) + await asyncio.gather(*(do_backup(s, snap_name, f'{s.server_id}/{snap_name}', ks, 'test', object_storage, manager, logger) for s in servers)) + + await manager.api.enable_injection(servers[1].ip_addr, "fail_download_sstable", one_shot=True) + async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {topology.rf}}}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test ( pk text primary key, value int ) WITH tablets = {{'min_tablet_count': {tablet_count}, 'max_tablet_count': {tablet_count}}};") + + logger.info(f'Restore cluster via {servers[0].ip_addr}') + manifests = [ f'{s.server_id}/{snap_name}/manifest.json' for s in servers ] + tid = await manager.api.restore_tablets(servers[0].ip_addr, ks, 'test', snap_name, servers[0].datacenter, object_storage.address, object_storage.bucket_name, manifests) + status = await manager.api.wait_task(servers[0].ip_addr, tid) + assert 'state' in status and status['state'] == 'failed' + assert 'error' in status and 'Failed to download' in status['error'] + + @pytest.mark.asyncio async def test_restore_with_non_existing_sstable(manager: ManagerClient, object_storage): '''Check that restore task fails well when given a non-existing sstable'''