diff --git a/sstables_loader.cc b/sstables_loader.cc index 791084548f..f0ec901acf 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -32,6 +32,7 @@ #include "message/messaging_service.hh" #include "service/storage_service.hh" #include "sstables_loader_helpers.hh" +#include "db/system_distributed_keyspace.hh" #include "sstables/object_storage_client.hh" #include "utils/rjson.hh" @@ -885,6 +886,125 @@ future sstables_loader::download_new_sstables(sstring ks_name, s std::move(prefix), std::move(sstables), scope, primary_replica_only(primary_replica)); co_return task->id(); } + +future<> sstables_loader::attach_sstable(table_id tid, const minimal_sst_info& min_info) const { + auto& db = _db.local(); + auto& table = db.find_column_family(tid); + llog.debug("Adding downloaded SSTables to the table {} on shard {}", table.schema()->cf_name(), this_shard_id()); + auto& sst_manager = table.get_sstables_manager(); + auto sst = sst_manager.make_sstable( + table.schema(), table.get_storage_options(), min_info.generation, sstables::sstable_state::normal, min_info.version, min_info.format); + sst->set_sstable_level(0); + auto erm = table.get_effective_replication_map(); + sstables::sstable_open_config cfg { + .unsealed_sstable = true, + .ignore_component_digest_mismatch = db.get_config().ignore_component_digest_mismatch(), + }; + co_await sst->load(erm->get_sharder(*table.schema()), cfg); + co_await table.add_new_sstable_and_update_cache(sst, [&sst_manager, sst] (sstables::shared_sstable loading_sst) -> future<> { + if (loading_sst == sst) { + auto writer_cfg = sst_manager.configure_writer(loading_sst->get_origin()); + co_await loading_sst->seal_sstable(writer_cfg.backup); + } + }); +} + +future<> sstables_loader::download_tablet_sstables(locator::global_tablet_id tid, locator::tablet_metadata_guard& guard) { + auto& tmap = guard.get_tablet_map(); + + auto* trinfo = tmap.get_tablet_transition_info(tid.tablet); + if (!trinfo) { + throw std::runtime_error(fmt::format("No transition info for tablet {}", tid)); + } + if (!trinfo->restore_cfg) { + throw std::runtime_error(format("No restore config for tablet {} restore transition", tid)); + } + + locator::restore_config restore_cfg = *trinfo->restore_cfg; + llog.info("Downloading sstables for tablet {} from {}@{}/{}", tid, restore_cfg.snapshot_name, restore_cfg.endpoint, restore_cfg.bucket); + + auto s = _db.local().find_schema(tid.table); + auto tablet_range = tmap.get_token_range(tid.tablet); + const auto& topo = guard.get_token_metadata()->get_topology(); + + auto sst_infos = co_await _sys_dist_ks.get_snapshot_sstables(restore_cfg.snapshot_name, s->ks_name(), s->cf_name(), topo.get_datacenter(), topo.get_rack(), + db::consistency_level::LOCAL_QUORUM, tablet_range.start().transform([] (auto& v) { return v.value(); }), tablet_range.end().transform([] (auto& v) { return v.value(); })); + llog.debug("{} SSTables found for tablet {}", sst_infos.size(), tid); + if (sst_infos.empty()) { + throw std::runtime_error(format("No SSTables found in system_distributed.snapshot_sstables for {}", restore_cfg.snapshot_name)); + } + + auto [ fully, partially ] = co_await get_sstables_for_tablet(sst_infos, tablet_range, [] (const auto& si) { return si.first_token; }, [] (const auto& si) { return si.last_token; }); + if (!partially.empty()) { + llog.debug("Sstable {} is partially contained", partially.front().sstable_id); + throw std::logic_error("sstables_partially_contained"); + } + llog.debug("{} SSTables filtered by range {} for tablet {}", fully.size(), tablet_range, tid); + if (fully.empty()) { + // It can happen that a tablet exists and contains no data. Just skip it + co_return; + } + + std::unordered_map> toc_names_by_prefix; + for (const auto& e : fully) { + toc_names_by_prefix[e.prefix].emplace_back(e.toc_name); + } + + auto ep_type = _storage_manager.get_endpoint_type(restore_cfg.endpoint); + sstables::sstable_open_config cfg { + .load_bloom_filter = false, + }; + + using sstables_col = std::vector; + using prefix_sstables = std::vector; + + auto sstables_on_shards = co_await map_reduce(toc_names_by_prefix, [&] (auto ent) { + return replica::distributed_loader::get_sstables_from_object_store(_db, s->ks_name(), s->cf_name(), + std::move(ent.second), restore_cfg.endpoint, ep_type, restore_cfg.bucket, std::move(ent.first), cfg, [&] { return nullptr; }).then_unpack([] (table_id, auto sstables) { + return make_ready_future>(std::move(sstables)); + }); + }, std::vector(smp::count), [&] (std::vector a, std::vector b) { + // We can't move individual elements of b[i], because these + // are lw_shared_ptr-s collected on another shard. So we move + // the whole sstables_col here so that subsequent code will + // walk over it and move the pointers where it wants on proper + // shard. + for (unsigned i = 0; i < smp::count; i++) { + a[i].push_back(std::move(b[i])); + } + return a; + }); + + auto downloaded_ssts = co_await container().map_reduce0( + [tid, &sstables_on_shards](auto& loader) -> future>> { + sstables_col sst_chunk; + for (auto& psst : sstables_on_shards[this_shard_id()]) { + for (auto&& sst : psst) { + sst_chunk.push_back(std::move(sst)); + } + } + std::vector> local_min_infos(smp::count); + co_await max_concurrent_for_each(sst_chunk, 16, [&loader, tid, &local_min_infos](const auto& sst) -> future<> { + auto& table = loader._db.local().find_column_family(tid.table); + auto min_info = co_await download_sstable(loader._db.local(), table, sst, llog); + local_min_infos[min_info.shard].emplace_back(std::move(min_info)); + }); + co_return local_min_infos; + }, + std::vector>(smp::count), + [](auto init, auto&& item) -> std::vector> { + for (std::size_t i = 0; i < item.size(); ++i) { + init[i].append_range(std::move(item[i])); + } + return init; + }); + + co_await container().invoke_on_all([tid, &downloaded_ssts] (auto& loader) -> future<> { + auto shard_ssts = std::move(downloaded_ssts[this_shard_id()]); + co_await max_concurrent_for_each(shard_ssts, 16, [&loader, tid](const auto& min_info) -> future<> { co_await loader.attach_sstable(tid.table, min_info); }); + }); +} + future> get_sstables_for_tablets_for_tests(const std::vector& sstables, std::vector&& tablets_ranges) { return tablet_sstable_streamer::get_sstables_for_tablets(sstables, std::move(tablets_ranges)); diff --git a/sstables_loader.hh b/sstables_loader.hh index 30e1894f89..6d5dee382e 100644 --- a/sstables_loader.hh +++ b/sstables_loader.hh @@ -16,7 +16,7 @@ #include "sstables/shared_sstable.hh" #include "tasks/task_manager.hh" #include "db/consistency_level_type.hh" - +#include "locator/tablets.hh" using namespace seastar; @@ -24,6 +24,8 @@ namespace replica { class database; } +struct minimal_sst_info; + namespace sstables { class storage_manager; } namespace netw { class messaging_service; } @@ -39,6 +41,7 @@ class storage_service; } namespace locator { class effective_replication_map; +class tablet_metadata_guard; } struct stream_progress { @@ -100,6 +103,9 @@ private: shared_ptr progress); future> await_topology_quiesced_and_get_erm(table_id table_id); + future<> download_tablet_sstables(locator::global_tablet_id tid, locator::tablet_metadata_guard&); + future<> attach_sstable(table_id tid, const minimal_sst_info& min_info) const; + public: sstables_loader(sharded& db, sharded& ss,