sstables_loader: Add method to download and attach sstables for a tablet

Extracts the data from snapshot_sstables tables and filters only
sstables belonging to current node and tablet in question, then starts
downloading the matched sstables

Extracted from Ernest PR #28701 and piggy-backs the refactoring from
another Ernest PR #28773. Will be used by next patches.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2026-03-02 15:59:59 +03:00
parent 212a9b04b1
commit 88e179ee13
2 changed files with 127 additions and 1 deletions

View File

@@ -32,6 +32,7 @@
#include "message/messaging_service.hh"
#include "service/storage_service.hh"
#include "sstables_loader_helpers.hh"
#include "db/system_distributed_keyspace.hh"
#include "sstables/object_storage_client.hh"
#include "utils/rjson.hh"
@@ -885,6 +886,125 @@ future<tasks::task_id> sstables_loader::download_new_sstables(sstring ks_name, s
std::move(prefix), std::move(sstables), scope, primary_replica_only(primary_replica));
co_return task->id();
}
future<> sstables_loader::attach_sstable(table_id tid, const minimal_sst_info& min_info) const {
auto& db = _db.local();
auto& table = db.find_column_family(tid);
llog.debug("Adding downloaded SSTables to the table {} on shard {}", table.schema()->cf_name(), this_shard_id());
auto& sst_manager = table.get_sstables_manager();
auto sst = sst_manager.make_sstable(
table.schema(), table.get_storage_options(), min_info.generation, sstables::sstable_state::normal, min_info.version, min_info.format);
sst->set_sstable_level(0);
auto erm = table.get_effective_replication_map();
sstables::sstable_open_config cfg {
.unsealed_sstable = true,
.ignore_component_digest_mismatch = db.get_config().ignore_component_digest_mismatch(),
};
co_await sst->load(erm->get_sharder(*table.schema()), cfg);
co_await table.add_new_sstable_and_update_cache(sst, [&sst_manager, sst] (sstables::shared_sstable loading_sst) -> future<> {
if (loading_sst == sst) {
auto writer_cfg = sst_manager.configure_writer(loading_sst->get_origin());
co_await loading_sst->seal_sstable(writer_cfg.backup);
}
});
}
future<> sstables_loader::download_tablet_sstables(locator::global_tablet_id tid, locator::tablet_metadata_guard& guard) {
auto& tmap = guard.get_tablet_map();
auto* trinfo = tmap.get_tablet_transition_info(tid.tablet);
if (!trinfo) {
throw std::runtime_error(fmt::format("No transition info for tablet {}", tid));
}
if (!trinfo->restore_cfg) {
throw std::runtime_error(format("No restore config for tablet {} restore transition", tid));
}
locator::restore_config restore_cfg = *trinfo->restore_cfg;
llog.info("Downloading sstables for tablet {} from {}@{}/{}", tid, restore_cfg.snapshot_name, restore_cfg.endpoint, restore_cfg.bucket);
auto s = _db.local().find_schema(tid.table);
auto tablet_range = tmap.get_token_range(tid.tablet);
const auto& topo = guard.get_token_metadata()->get_topology();
auto sst_infos = co_await _sys_dist_ks.get_snapshot_sstables(restore_cfg.snapshot_name, s->ks_name(), s->cf_name(), topo.get_datacenter(), topo.get_rack(),
db::consistency_level::LOCAL_QUORUM, tablet_range.start().transform([] (auto& v) { return v.value(); }), tablet_range.end().transform([] (auto& v) { return v.value(); }));
llog.debug("{} SSTables found for tablet {}", sst_infos.size(), tid);
if (sst_infos.empty()) {
throw std::runtime_error(format("No SSTables found in system_distributed.snapshot_sstables for {}", restore_cfg.snapshot_name));
}
auto [ fully, partially ] = co_await get_sstables_for_tablet(sst_infos, tablet_range, [] (const auto& si) { return si.first_token; }, [] (const auto& si) { return si.last_token; });
if (!partially.empty()) {
llog.debug("Sstable {} is partially contained", partially.front().sstable_id);
throw std::logic_error("sstables_partially_contained");
}
llog.debug("{} SSTables filtered by range {} for tablet {}", fully.size(), tablet_range, tid);
if (fully.empty()) {
// It can happen that a tablet exists and contains no data. Just skip it
co_return;
}
std::unordered_map<sstring, std::vector<sstring>> toc_names_by_prefix;
for (const auto& e : fully) {
toc_names_by_prefix[e.prefix].emplace_back(e.toc_name);
}
auto ep_type = _storage_manager.get_endpoint_type(restore_cfg.endpoint);
sstables::sstable_open_config cfg {
.load_bloom_filter = false,
};
using sstables_col = std::vector<sstables::shared_sstable>;
using prefix_sstables = std::vector<sstables_col>;
auto sstables_on_shards = co_await map_reduce(toc_names_by_prefix, [&] (auto ent) {
return replica::distributed_loader::get_sstables_from_object_store(_db, s->ks_name(), s->cf_name(),
std::move(ent.second), restore_cfg.endpoint, ep_type, restore_cfg.bucket, std::move(ent.first), cfg, [&] { return nullptr; }).then_unpack([] (table_id, auto sstables) {
return make_ready_future<std::vector<sstables_col>>(std::move(sstables));
});
}, std::vector<prefix_sstables>(smp::count), [&] (std::vector<prefix_sstables> a, std::vector<sstables_col> b) {
// We can't move individual elements of b[i], because these
// are lw_shared_ptr-s collected on another shard. So we move
// the whole sstables_col here so that subsequent code will
// walk over it and move the pointers where it wants on proper
// shard.
for (unsigned i = 0; i < smp::count; i++) {
a[i].push_back(std::move(b[i]));
}
return a;
});
auto downloaded_ssts = co_await container().map_reduce0(
[tid, &sstables_on_shards](auto& loader) -> future<std::vector<std::vector<minimal_sst_info>>> {
sstables_col sst_chunk;
for (auto& psst : sstables_on_shards[this_shard_id()]) {
for (auto&& sst : psst) {
sst_chunk.push_back(std::move(sst));
}
}
std::vector<std::vector<minimal_sst_info>> local_min_infos(smp::count);
co_await max_concurrent_for_each(sst_chunk, 16, [&loader, tid, &local_min_infos](const auto& sst) -> future<> {
auto& table = loader._db.local().find_column_family(tid.table);
auto min_info = co_await download_sstable(loader._db.local(), table, sst, llog);
local_min_infos[min_info.shard].emplace_back(std::move(min_info));
});
co_return local_min_infos;
},
std::vector<std::vector<minimal_sst_info>>(smp::count),
[](auto init, auto&& item) -> std::vector<std::vector<minimal_sst_info>> {
for (std::size_t i = 0; i < item.size(); ++i) {
init[i].append_range(std::move(item[i]));
}
return init;
});
co_await container().invoke_on_all([tid, &downloaded_ssts] (auto& loader) -> future<> {
auto shard_ssts = std::move(downloaded_ssts[this_shard_id()]);
co_await max_concurrent_for_each(shard_ssts, 16, [&loader, tid](const auto& min_info) -> future<> { co_await loader.attach_sstable(tid.table, min_info); });
});
}
future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges) {
return tablet_sstable_streamer::get_sstables_for_tablets(sstables, std::move(tablets_ranges));

View File

@@ -16,7 +16,7 @@
#include "sstables/shared_sstable.hh"
#include "tasks/task_manager.hh"
#include "db/consistency_level_type.hh"
#include "locator/tablets.hh"
using namespace seastar;
@@ -24,6 +24,8 @@ namespace replica {
class database;
}
struct minimal_sst_info;
namespace sstables { class storage_manager; }
namespace netw { class messaging_service; }
@@ -39,6 +41,7 @@ class storage_service;
}
namespace locator {
class effective_replication_map;
class tablet_metadata_guard;
}
struct stream_progress {
@@ -100,6 +103,9 @@ private:
shared_ptr<stream_progress> progress);
future<seastar::shared_ptr<const locator::effective_replication_map>> await_topology_quiesced_and_get_erm(table_id table_id);
future<> download_tablet_sstables(locator::global_tablet_id tid, locator::tablet_metadata_guard&);
future<> attach_sstable(table_id tid, const minimal_sst_info& min_info) const;
public:
sstables_loader(sharded<replica::database>& db,
sharded<service::storage_service>& ss,