Files
scylladb/sstables_loader.cc
Tomasz Grabiec 9daed59af9 Merge 'Tablet-aware restore' from Pavel Emelyanov
The mechanics of the restore is like this

- A /storage_service/tablets/restore API is called with (keyspace, table, endpoint, bucket, manifests) parameters
  - First, it populates the system_distributed.snapshot_sstables table with the data read from the manifests
  - Then it emplaces a bunch of tablet transitions (of a new "restore" kind), one for each tablet
- The topology coordinator handles the "restore" transition by calling a new RESTORE_TABLET RPC against all the current tablet replicas
- Each replica handles the RPC verb by
  - Reading the snapshot_sstables table
  - Filtering the read sstable infos against current node and tablet being handled
  - Downloading and attaching the filtered sstables

This PR includes system_distributed.snapshot_sstables table from @robertbindar and preparation work from @kreuzerkrieg that extracts raw sstables downloading and attaching from existing generic sstables loading code.

This is first step towards SCYLLADB-197 and lacks many things. In particular
- the API only works for single-DC cluster
- the caller needs to "lock" tablet boundaries with min/max tablet count
- not abortable
- no progress tracking
- sub-optimal (re-kicking API on restore will re-download everything again)
- not re-attacheable (if API node dies, restoration proceeds, but the caller cannot "wait" for it to complete via other node)
- nodes download sstables in maintenance/streaming sched gorup (should be moved to maintenance/backup)

Other follow-up items:
- have an actual swagger object specification for `backup_location`

Closes #28436
Closes #28657
Closes #28773

Closes scylladb/scylladb#28763

* github.com:scylladb/scylladb:
  test: Add test for backup vs migration race
  test: Restore resilience test
  sstables_loader: Fail tablet-restore task if not all sstables were downloaded
  sstables_loader: mark sstables as downloaded after attaching
  sstables_loader: return shared_sstable from attach_sstable
  db: add update_sstable_download_status method
  db: add downloaded column to snapshot_sstables
  db: extract snapshot_sstables TTL into class constant
  test: Add a test for tablet-aware restore
  tablets: Implement tablet-aware cluster-wide restore
  messaging: Add RESTORE_TABLET RPC verb
  sstables_loader: Add method to download and attach sstables for a tablet
  tablets: Add restore_config to tablet_transition_info
  sstables_loader: Add restore_tablets task skeleton
  test: Add rest_client helper to kick newly introduced API endpoint
  api: Add /storage_service/tablets/restore endpoint skeleton
  sstables_loader: Add keyspace and table arguments to manfiest loading helper
  sstables_loader_helpers: just reformat the code
  sstables_loader_helpers: generalize argument and variable names
  sstables_loader_helpers: generalize get_sstables_for_tablet
  sstables_loader_helpers: add token getters for tablet filtering
  sstables_loader_helpers: remove underscores from struct members
  sstables_loader: move download_sstable and get_sstables_for_tablet
  sstables_loader: extract single-tablet SST filtering
  sstables_loader: make download_sstable static
  sstables_loader: fix formating of the new `download_sstable` function
  sstables_loader: extract single SST download into a function
  sstables_loader: add shard_id to minimal_sst_info
  sstables_loader: add function for parsing backup manifests
  split utility functions for creating test data from database_test
  export make_storage_options_config from lib/test_services
  rjson: Add helpers for conversions to dht::token and sstable_id
  Add system_distributed_keyspace.snapshot_sstables
  add get_system_distributed_keyspace to cql_test_env
  code: Add system_distributed_keyspace dependency to sstables_loader
  storage_service: Export export handle_raft_rpc() helper
  storage_service: Export do_tablet_operation()
  storage_service: Split transit_tablet() into two
  tablets: Add braces around tablet_transition_kind::repair switch
2026-04-21 02:27:24 +02:00

1200 lines
57 KiB
C++

/*
* Copyright (C) 2021-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#include <fmt/ranges.h>
#include <seastar/core/coroutine.hh>
#include <seastar/core/map_reduce.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/shared_mutex.hh>
#include <seastar/core/units.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/switch_to.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/rpc/rpc.hh>
#include "sstables_loader.hh"
#include "db/config.hh"
#include "dht/auto_refreshing_sharder.hh"
#include "replica/distributed_loader.hh"
#include "replica/database.hh"
#include "sstables/sstables_manager.hh"
#include "sstables/sstables.hh"
#include "gms/inet_address.hh"
#include "gms/feature_service.hh"
#include "streaming/stream_mutation_fragments_cmd.hh"
#include "streaming/stream_reason.hh"
#include "readers/mutation_fragment_v1_stream.hh"
#include "locator/abstract_replication_strategy.hh"
#include "message/messaging_service.hh"
#include "service/storage_service.hh"
#include "utils/error_injection.hh"
#include "sstables_loader_helpers.hh"
#include "db/system_distributed_keyspace.hh"
#include "idl/sstables_loader.dist.hh"
#include "sstables/object_storage_client.hh"
#include "utils/rjson.hh"
#include "db/system_distributed_keyspace.hh"
#include <cfloat>
#include <algorithm>
static logging::logger llog("sstables_loader");
namespace {
class send_meta_data {
locator::host_id _node;
seastar::rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd> _sink;
seastar::rpc::source<int32_t> _source;
const bool _abort_supported = false;
bool _error_from_peer = false;
size_t _num_partitions_sent = 0;
size_t _num_bytes_sent = 0;
future<> _receive_done;
private:
future<> do_receive() {
int32_t status = 0;
while (auto status_opt = co_await _source()) {
status = std::get<0>(*status_opt);
llog.debug("send_meta_data: got error code={}, from node={}", status, _node);
if (status == -1) {
_error_from_peer = true;
}
}
llog.debug("send_meta_data: finished reading source from node={}", _node);
if (_error_from_peer) {
throw std::runtime_error(format("send_meta_data: got error code={} from node={}", status, _node));
}
co_return;
}
public:
send_meta_data(locator::host_id node,
seastar::rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd> sink,
seastar::rpc::source<int32_t> source, bool abort_supported)
: _node(std::move(node))
, _sink(std::move(sink))
, _source(std::move(source))
, _abort_supported(abort_supported)
, _receive_done(make_ready_future<>()) {
}
void receive() {
_receive_done = do_receive();
}
future<> send(const frozen_mutation_fragment& fmf, bool is_partition_start) {
if (_error_from_peer) {
throw std::runtime_error(format("send_meta_data: got error from peer node={}", _node));
}
auto size = fmf.representation().size();
if (is_partition_start) {
++_num_partitions_sent;
}
_num_bytes_sent += size;
llog.trace("send_meta_data: send mf to node={}, size={}", _node, size);
co_return co_await _sink(fmf, streaming::stream_mutation_fragments_cmd::mutation_fragment_data);
}
future<> finish(bool failed, bool aborted) {
std::exception_ptr eptr;
try {
if (_abort_supported && aborted) {
co_await _sink(frozen_mutation_fragment(bytes_ostream()), streaming::stream_mutation_fragments_cmd::abort);
} else if (failed) {
co_await _sink(frozen_mutation_fragment(bytes_ostream()), streaming::stream_mutation_fragments_cmd::error);
} else {
co_await _sink(frozen_mutation_fragment(bytes_ostream()), streaming::stream_mutation_fragments_cmd::end_of_stream);
}
} catch (...) {
eptr = std::current_exception();
llog.warn("send_meta_data: failed to send {} to node={}, err={}",
failed ? "stream_mutation_fragments_cmd::error" : "stream_mutation_fragments_cmd::end_of_stream", _node, eptr);
}
try {
co_await _sink.close();
} catch (...) {
eptr = std::current_exception();
llog.warn("send_meta_data: failed to close sink to node={}, err={}", _node, eptr);
}
try {
co_await std::move(_receive_done);
} catch (...) {
eptr = std::current_exception();
llog.warn("send_meta_data: failed to process source from node={}, err={}", _node, eptr);
}
if (eptr) {
std::rethrow_exception(eptr);
}
co_return;
}
size_t num_partitions_sent() {
return _num_partitions_sent;
}
size_t num_bytes_sent() {
return _num_bytes_sent;
}
};
} // anonymous namespace
using primary_replica_only = bool_class<struct primary_replica_only_tag>;
using unlink_sstables = bool_class<struct unlink_sstables_tag>;
class sstable_streamer {
protected:
using stream_scope = sstables_loader::stream_scope;
netw::messaging_service& _ms;
replica::database& _db;
replica::table& _table;
locator::effective_replication_map_ptr _erm;
std::vector<sstables::shared_sstable> _sstables;
const primary_replica_only _primary_replica_only;
const unlink_sstables _unlink_sstables;
const stream_scope _stream_scope;
public:
sstable_streamer(netw::messaging_service& ms, replica::database& db, ::table_id table_id, locator::effective_replication_map_ptr erm,
std::vector<sstables::shared_sstable> sstables, primary_replica_only primary, unlink_sstables unlink, stream_scope scope)
: _ms(ms)
, _db(db)
, _table(db.find_column_family(table_id))
, _erm(std::move(erm))
, _sstables(std::move(sstables))
, _primary_replica_only(primary)
, _unlink_sstables(unlink)
, _stream_scope(scope)
{
// By sorting SSTables by their primary key, we allow SSTable runs to be
// incrementally streamed.
// Overlapping run fragments can have their content deduplicated, reducing
// the amount of data we need to put on the wire.
// Elements are popped off from the back of the vector, therefore we're sorting
// it in descending order, to start from the smaller tokens.
std::ranges::sort(_sstables, [] (const sstables::shared_sstable& x, const sstables::shared_sstable& y) {
return x->compare_by_first_key(*y) > 0;
});
}
virtual ~sstable_streamer() {}
virtual future<> stream(shared_ptr<stream_progress> progress);
host_id_vector_replica_set get_endpoints(const dht::token& token) const;
future<> stream_sstable_mutations(streaming::plan_id, const dht::partition_range&, std::vector<sstables::shared_sstable>);
protected:
virtual host_id_vector_replica_set get_primary_endpoints(const dht::token& token, std::function<bool(const locator::host_id&)> filter) const;
future<> stream_sstables(const dht::partition_range&, std::vector<sstables::shared_sstable>, shared_ptr<stream_progress> progress);
private:
host_id_vector_replica_set get_all_endpoints(const dht::token& token) const;
};
class tablet_sstable_streamer : public sstable_streamer {
sharded<replica::database>& _db;
const locator::tablet_map& _tablet_map;
public:
tablet_sstable_streamer(netw::messaging_service& ms, sharded<replica::database>& db, ::table_id table_id, locator::effective_replication_map_ptr erm,
std::vector<sstables::shared_sstable> sstables, primary_replica_only primary, unlink_sstables unlink, stream_scope scope)
: sstable_streamer(ms, db.local(), table_id, std::move(erm), std::move(sstables), primary, unlink, scope)
, _db(db)
, _tablet_map(_erm->get_token_metadata().tablets().get_tablet_map(table_id)) {
}
virtual future<> stream(shared_ptr<stream_progress> on_streamed) override;
virtual host_id_vector_replica_set get_primary_endpoints(const dht::token& token, std::function<bool(const locator::host_id&)> filter) const override;
private:
host_id_vector_replica_set to_replica_set(const locator::tablet_replica_set& replicas) const {
host_id_vector_replica_set result;
result.reserve(replicas.size());
for (auto&& replica : replicas) {
result.push_back(replica.host);
}
return result;
}
using sst_classification_info = std::vector<std::vector<minimal_sst_info>>;
future<> attach_sstable(shard_id from_shard, const sstring& ks, const sstring& cf, const minimal_sst_info& min_info) const {
llog.debug("Adding downloaded SSTables to the table {} on shard {}, submitted from shard {}", _table.schema()->cf_name(), this_shard_id(), from_shard);
auto& db = _db.local();
auto& table = db.find_column_family(ks, cf);
auto& sst_manager = table.get_sstables_manager();
auto sst = sst_manager.make_sstable(
table.schema(), table.get_storage_options(), min_info.generation, sstables::sstable_state::normal, min_info.version, min_info.format);
sst->set_sstable_level(0);
auto units = co_await sst_manager.dir_semaphore().get_units(1);
sstables::sstable_open_config cfg {
.unsealed_sstable = true,
};
co_await sst->load(table.get_effective_replication_map()->get_sharder(*table.schema()), cfg);
co_await table.add_new_sstable_and_update_cache(sst, [&sst_manager, sst] (sstables::shared_sstable loading_sst) -> future<> {
if (loading_sst == sst) {
auto writer_cfg = sst_manager.configure_writer(loading_sst->get_origin());
co_await loading_sst->seal_sstable(writer_cfg.backup);
}
});
}
future<>
stream_fully_contained_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables, shared_ptr<stream_progress> progress) {
if (_stream_scope != stream_scope::node) {
co_return co_await stream_sstables(pr, std::move(sstables), std::move(progress));
}
llog.debug("Directly downloading {} fully contained SSTables to local node from object storage.", sstables.size());
auto downloaded_ssts = co_await download_fully_contained_sstables(std::move(sstables));
co_await smp::invoke_on_all(
[this, &downloaded_ssts, from = this_shard_id(), ks = _table.schema()->ks_name(), cf = _table.schema()->cf_name()] -> future<> {
auto shard_ssts = std::move(downloaded_ssts[this_shard_id()]);
for (const auto& min_info : shard_ssts) {
co_await attach_sstable(from, ks, cf, min_info);
}
});
if (progress) {
progress->advance(std::accumulate(downloaded_ssts.cbegin(), downloaded_ssts.cend(), 0., [](float acc, const auto& v) { return acc + v.size(); }));
}
}
future<sst_classification_info> download_fully_contained_sstables(std::vector<sstables::shared_sstable> sstables) const {
sst_classification_info downloaded_sstables(smp::count);
for (const auto& sstable : sstables) {
auto min_info = co_await download_sstable(_db.local(), _table, sstable, llog);
downloaded_sstables[min_info.shard].emplace_back(min_info);
}
co_return downloaded_sstables;
}
bool tablet_in_scope(locator::tablet_id) const;
friend future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges);
// Pay attention, while working with tablet ranges, the `erm` must be held alive as long as we retrieve (and use here) tablet ranges from
// the tablet map. This is already done when using `tablet_sstable_streamer` class but tread carefully if you plan to use this method somewhere else.
static future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges);
};
host_id_vector_replica_set sstable_streamer::get_endpoints(const dht::token& token) const {
auto host_filter = [&topo = _erm->get_topology(), scope = _stream_scope] (const locator::host_id& ep) {
switch (scope) {
case stream_scope::all:
return true;
case stream_scope::dc:
return topo.get_datacenter(ep) == topo.get_datacenter();
case stream_scope::rack:
return topo.get_location(ep) == topo.get_location();
case stream_scope::node:
return topo.is_me(ep);
}
};
if (_primary_replica_only) {
if (_stream_scope == stream_scope::node) {
throw std::runtime_error("Node scoped streaming of primary replica only is not supported");
}
return get_primary_endpoints(token, std::move(host_filter));
}
return get_all_endpoints(token) | std::views::filter(std::move(host_filter)) | std::ranges::to<host_id_vector_replica_set>();
}
host_id_vector_replica_set sstable_streamer::get_all_endpoints(const dht::token& token) const {
auto current_targets = _erm->get_natural_replicas(token);
auto pending = _erm->get_pending_replicas(token);
std::move(pending.begin(), pending.end(), std::back_inserter(current_targets));
return current_targets;
}
host_id_vector_replica_set sstable_streamer::get_primary_endpoints(const dht::token& token, std::function<bool(const locator::host_id&)> filter) const {
auto current_targets = _erm->get_natural_replicas(token) | std::views::filter(std::move(filter)) | std::ranges::to<host_id_vector_replica_set>();
current_targets.resize(1);
return current_targets;
}
host_id_vector_replica_set tablet_sstable_streamer::get_primary_endpoints(const dht::token& token, std::function<bool(const locator::host_id&)> filter) const {
auto tid = _tablet_map.get_tablet_id(token);
auto replicas = locator::get_primary_replicas(_tablet_map, tid, _erm->get_topology(), [filter = std::move(filter)] (const locator::tablet_replica& replica) {
return filter(replica.host);
});
return to_replica_set(replicas);
}
future<> sstable_streamer::stream(shared_ptr<stream_progress> progress) {
if (progress) {
progress->start(_sstables.size());
}
const auto full_partition_range = dht::partition_range::make_open_ended_both_sides();
co_await stream_sstables(full_partition_range, std::move(_sstables), std::move(progress));
}
bool tablet_sstable_streamer::tablet_in_scope(locator::tablet_id tid) const {
if (_stream_scope == stream_scope::all) {
return true;
}
const auto& topo = _erm->get_topology();
for (const auto& r : _tablet_map.get_tablet_info(tid).replicas) {
switch (_stream_scope) {
case stream_scope::node:
if (topo.is_me(r.host)) {
return true;
}
break;
case stream_scope::rack:
if (topo.get_location(r.host) == topo.get_location()) {
return true;
}
break;
case stream_scope::dc:
if (topo.get_datacenter(r.host) == topo.get_datacenter()) {
return true;
}
break;
case stream_scope::all: // checked above already, but still need it here
return true;
}
}
return false;
}
// The tablet_sstable_streamer implements a hierarchical streaming strategy:
//
// 1. Top Level (Per-Tablet Streaming):
// - Unlike vnode streaming, this streams sstables on a tablet-by-tablet basis
// - For a table with M tablets, each tablet[i] maps to its own set of SSTable files
// stored in tablet_to_sstables[i]
// - If tablet_to_sstables[i] is empty, that tablet's streaming is considered complete
// - Progress tracking advances by 1.0 unit when an entire tablet completes streaming
//
// 2. Inner Level (Per-SSTable Streaming):
// - Within each tablet's batch, individual SSTables are streamed in smaller sub-batches
// - The per_tablet_stream_progress class tracks streaming progress at this level:
// - Updates when a set of SSTables completes streaming
// - For n completed SSTables, advances by (n / total_sstables_in_current_tablet)
// - Provides granular tracking for the inner level streaming operations
// - Helps estimate completion time for the current tablet's batch
//
// Progress Tracking:
// The streaming progress is monitored at two granularity levels:
// - Tablet level: Overall progress where each tablet contributes 1.0 units
// - SSTable level: Progress of individual SSTable transfers within a tablet,
// managed by the per_tablet_stream_progress class
//
// Note: For simplicity, we assume uniform streaming time across tablets, even though
// tablets may vary significantly in their SSTable count or size. This assumption
// helps in progress estimation without requiring prior knowledge of SSTable
// distribution across tablets.
struct per_tablet_stream_progress : public stream_progress {
private:
shared_ptr<stream_progress> _per_table_progress;
const size_t _num_sstables_mapped;
public:
per_tablet_stream_progress(shared_ptr<stream_progress> per_table_progress,
size_t num_sstables_mapped)
: _per_table_progress(std::move(per_table_progress))
, _num_sstables_mapped(num_sstables_mapped) {
if (_per_table_progress && _num_sstables_mapped == 0) {
// consider this tablet completed if nothing to stream
_per_table_progress->advance(1.0);
}
}
void advance(float num_sstable_streamed) override {
// we should not move backward
assert(num_sstable_streamed >= 0.);
// we should call advance() only if the current tablet maps to at least
// one sstable.
assert(_num_sstables_mapped > 0);
if (_per_table_progress) {
_per_table_progress->advance(num_sstable_streamed / _num_sstables_mapped);
}
}
};
future<std::vector<tablet_sstable_collection>> tablet_sstable_streamer::get_sstables_for_tablets(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges) {
auto tablets_sstables =
tablets_ranges | std::views::transform([](auto range) { return tablet_sstable_collection{.tablet_range = range}; }) | std::ranges::to<std::vector>();
if (sstables.empty() || tablets_sstables.empty()) {
co_return std::move(tablets_sstables);
}
// sstables are sorted by first key in reverse order.
auto reversed_sstables = sstables | std::views::reverse;
for (auto& [tablet_range, sstables_fully_contained, sstables_partially_contained] : tablets_sstables) {
auto [fully, partially] = co_await get_sstables_for_tablet(reversed_sstables, tablet_range, [](const auto& sst) { return sst->get_first_decorated_key().token(); }, [](const auto& sst) { return sst->get_last_decorated_key().token(); });
sstables_fully_contained = std::move(fully);
sstables_partially_contained = std::move(partially);
}
co_return std::move(tablets_sstables);
}
future<> tablet_sstable_streamer::stream(shared_ptr<stream_progress> progress) {
if (progress) {
progress->start(_tablet_map.tablet_count());
}
auto classified_sstables = co_await get_sstables_for_tablets(
_sstables, _tablet_map.tablet_ids() | std::views::filter([this](auto tid) { return tablet_in_scope(tid); }) | std::views::transform([this](auto tid) {
return _tablet_map.get_token_range(tid);
}) | std::ranges::to<std::vector>());
for (auto& [tablet_range, sstables_fully_contained, sstables_partially_contained] : classified_sstables) {
auto per_tablet_progress = make_shared<per_tablet_stream_progress>(
progress,
sstables_fully_contained.size() + sstables_partially_contained.size());
auto tablet_pr = dht::to_partition_range(tablet_range);
if (!sstables_partially_contained.empty()) {
llog.debug("Streaming {} partially contained SSTables.",sstables_partially_contained.size());
co_await stream_sstables(tablet_pr, std::move(sstables_partially_contained), per_tablet_progress);
}
if (!sstables_fully_contained.empty()) {
llog.debug("Streaming {} fully contained SSTables.",sstables_fully_contained.size());
co_await stream_fully_contained_sstables(tablet_pr, std::move(sstables_fully_contained), per_tablet_progress);
}
}
}
future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables, shared_ptr<stream_progress> progress) {
size_t nr_sst_total = sstables.size();
size_t nr_sst_current = 0;
while (!sstables.empty()) {
co_await utils::get_local_injector().inject("load_and_stream_before_streaming_batch",
utils::wait_for_message(60s));
const size_t batch_sst_nr = std::min(16uz, sstables.size());
auto sst_processed = sstables
| std::views::reverse
| std::views::take(batch_sst_nr)
| std::ranges::to<std::vector>();
sstables.erase(sstables.end() - batch_sst_nr, sstables.end());
auto ops_uuid = streaming::plan_id{utils::make_random_uuid()};
llog.info("load_and_stream: started ops_uuid={}, process [{}-{}] out of {} sstables=[{}]",
ops_uuid, nr_sst_current, nr_sst_current + sst_processed.size(), nr_sst_total,
fmt::join(sst_processed | std::views::transform([] (auto sst) { return sst->get_filename(); }), ", "));
nr_sst_current += sst_processed.size();
co_await stream_sstable_mutations(ops_uuid, pr, std::move(sst_processed));
if (progress) {
progress->advance(batch_sst_nr);
}
}
}
future<> sstable_streamer::stream_sstable_mutations(streaming::plan_id ops_uuid, const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables) {
const auto token_range = pr.transform(std::mem_fn(&dht::ring_position::token));
auto s = _table.schema();
const auto cf_id = s->id();
const auto reason = streaming::stream_reason::repair;
auto sst_set = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(s, std::move(token_range)));
size_t estimated_partitions = 0;
for (auto& sst : sstables) {
estimated_partitions += co_await sst->estimated_keys_for_range(token_range);
sst_set->insert(sst);
}
auto start_time = std::chrono::steady_clock::now();
host_id_vector_replica_set current_targets;
std::unordered_map<locator::host_id, send_meta_data> metas;
size_t num_partitions_processed = 0;
size_t num_bytes_read = 0;
auto permit = co_await _db.obtain_reader_permit(_table, "sstables_loader::load_and_stream()", db::no_timeout, {});
auto reader = mutation_fragment_v1_stream(_table.make_streaming_reader(s, std::move(permit), pr, sst_set, gc_clock::now()));
std::exception_ptr eptr;
bool failed = false;
try {
while (auto mf = co_await reader()) {
bool is_partition_start = mf->is_partition_start();
if (is_partition_start) {
++num_partitions_processed;
auto& start = mf->as_partition_start();
const auto& current_dk = start.key();
current_targets = get_endpoints(current_dk.token());
llog.trace("load_and_stream: ops_uuid={}, current_dk={}, current_targets={}", ops_uuid,
current_dk.token(), current_targets);
for (auto& node : current_targets) {
if (!metas.contains(node)) {
auto [sink, source] = co_await _ms.make_sink_and_source_for_stream_mutation_fragments(reader.schema()->version(),
ops_uuid, cf_id, estimated_partitions, reason, service::default_session_id, node);
bool abort_supported = _ms.supports_load_and_stream_abort_rpc_message();
llog.debug("load_and_stream: ops_uuid={}, make sink and source for node={}", ops_uuid, node);
metas.emplace(node, send_meta_data(node, std::move(sink), std::move(source), abort_supported));
metas.at(node).receive();
}
}
}
frozen_mutation_fragment fmf = freeze(*s, *mf);
num_bytes_read += fmf.representation().size();
co_await coroutine::parallel_for_each(current_targets, [&metas, &fmf, is_partition_start] (const locator::host_id& node) {
return metas.at(node).send(fmf, is_partition_start);
});
}
} catch (...) {
failed = true;
eptr = std::current_exception();
llog.warn("load_and_stream: ops_uuid={}, ks={}, table={}, send_phase, err={}",
ops_uuid, s->ks_name(), s->cf_name(), eptr);
}
co_await reader.close();
try {
co_await coroutine::parallel_for_each(metas.begin(), metas.end(), [failed, eptr] (std::pair<const locator::host_id, send_meta_data>& pair) {
auto& meta = pair.second;
if (eptr) {
try {
std::rethrow_exception(eptr);
} catch (const abort_requested_exception&) {
return meta.finish(failed, true);
} catch (...) {
// just fall through
}
}
return meta.finish(failed, false);
});
} catch (...) {
failed = true;
eptr = std::current_exception();
llog.warn("load_and_stream: ops_uuid={}, ks={}, table={}, finish_phase, err={}",
ops_uuid, s->ks_name(), s->cf_name(), eptr);
}
if (!failed && _unlink_sstables) {
try {
co_await coroutine::parallel_for_each(sstables, [&] (sstables::shared_sstable& sst) {
llog.debug("load_and_stream: ops_uuid={}, ks={}, table={}, remove sst={}",
ops_uuid, s->ks_name(), s->cf_name(), sst->toc_filename());
return sst->mark_for_deletion();
});
} catch (...) {
failed = true;
eptr = std::current_exception();
llog.warn("load_and_stream: ops_uuid={}, ks={}, table={}, del_sst_phase, err={}",
ops_uuid, s->ks_name(), s->cf_name(), eptr);
}
}
auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::steady_clock::now() - start_time).count();
for (auto& [node, meta] : metas) {
llog.info("load_and_stream: ops_uuid={}, ks={}, table={}, target_node={}, num_partitions_sent={}, num_bytes_sent={}",
ops_uuid, s->ks_name(), s->cf_name(), node, meta.num_partitions_sent(), meta.num_bytes_sent());
}
auto partition_rate = std::fabs(duration) > FLT_EPSILON ? num_partitions_processed / duration : 0;
auto bytes_rate = std::fabs(duration) > FLT_EPSILON ? num_bytes_read / duration / 1024 / 1024 : 0;
auto status = failed ? "failed" : "succeeded";
llog.info("load_and_stream: finished ops_uuid={}, ks={}, table={}, partitions_processed={} partitions, bytes_processed={} bytes, partitions_per_second={} partitions/s, bytes_per_second={} MiB/s, duration={} s, status={}",
ops_uuid, s->ks_name(), s->cf_name(), num_partitions_processed, num_bytes_read, partition_rate, bytes_rate, duration, status);
if (failed) {
std::rethrow_exception(eptr);
}
}
future<locator::effective_replication_map_ptr> sstables_loader::await_topology_quiesced_and_get_erm(::table_id table_id) {
// By waiting for topology to quiesce, we guarantee load-and-stream will not start in the middle
// of a topology operation that changes the token range boundaries, e.g. split or merge.
// Split, for example, first executes the barrier and then splits the tablets.
// So it can happen a sstable is generated between those steps and will incorrectly span two
// tablets. We want to serialize load-and-stream and split finalization (a topology op).
locator::effective_replication_map_ptr erm;
while (true) {
auto& t = _db.local().find_column_family(table_id);
erm = t.get_effective_replication_map();
auto expected_topology_version = erm->get_token_metadata().get_version();
auto& ss = _ss.local();
// The awaiting only works with raft enabled, and we only need it with tablets,
// so let's bypass the awaiting when tablet is disabled.
if (!t.uses_tablets()) {
break;
}
// optimistically attempt to grab an erm on quiesced topology
if (co_await ss.verify_topology_quiesced(expected_topology_version)) {
break;
}
erm = nullptr;
co_await _ss.local().await_topology_quiesced();
}
co_return std::move(erm);
}
future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
::table_id table_id, std::vector<sstables::shared_sstable> sstables, primary_replica_only primary, bool unlink, stream_scope scope,
shared_ptr<stream_progress> progress) {
// streamer guarantees topology stability, for correctness, by holding effective_replication_map
// throughout its lifetime.
auto erm = co_await await_topology_quiesced_and_get_erm(table_id);
// Obtain a phaser guard to prevent the table from being destroyed
// while streaming is in progress. table::stop() calls
// _pending_streams_phaser.close() which blocks until all outstanding
// stream_in_progress() guards are released, so holding this guard
// keeps the table alive for the entire streaming operation.
// find_column_family throws no_such_column_family if the table was
// already dropped before we got here.
auto& tbl = _db.local().find_column_family(table_id);
auto stream_guard = tbl.stream_in_progress();
std::unique_ptr<sstable_streamer> streamer;
if (tbl.uses_tablets()) {
streamer =
std::make_unique<tablet_sstable_streamer>(_messaging, _db, table_id, std::move(erm), std::move(sstables), primary, unlink_sstables(unlink), scope);
} else {
streamer =
std::make_unique<sstable_streamer>(_messaging, _db.local(), table_id, std::move(erm), std::move(sstables), primary, unlink_sstables(unlink), scope);
}
co_await streamer->stream(progress);
}
// For more details, see distributed_loader::process_upload_dir().
// All the global operations are going to happen here, and just the reloading happens
// in there.
future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name,
bool load_and_stream, bool primary, bool skip_cleanup, bool skip_reshape, stream_scope scope) {
if (_loading_new_sstables) {
throw std::runtime_error("Already loading SSTables. Try again later");
} else {
_loading_new_sstables = true;
}
co_await coroutine::switch_to(_sched_group);
sstring load_and_stream_desc = fmt::format("{}", load_and_stream);
const auto& rs = _db.local().find_keyspace(ks_name).get_replication_strategy();
if (rs.is_per_table() && !load_and_stream) {
load_and_stream = true;
load_and_stream_desc = "auto-enabled-for-tablets";
}
if (load_and_stream && skip_cleanup) {
throw std::runtime_error("Skipping cleanup is not possible when doing load-and-stream");
}
if (load_and_stream && skip_reshape) {
throw std::runtime_error("Skipping reshape is not possible when doing load-and-stream");
}
llog.info("Loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, skip_cleanup={}, skip_reshape={}, scope={}",
ks_name, cf_name, load_and_stream_desc, primary, skip_cleanup, skip_reshape, scope);
try {
if (load_and_stream) {
::table_id table_id;
std::vector<std::vector<sstables::shared_sstable>> sstables_on_shards;
// Load-and-stream reads the entire content from SSTables, therefore it can afford to discard the bloom filter
// that might otherwise consume a significant amount of memory.
sstables::sstable_open_config cfg {
.load_bloom_filter = false,
};
std::tie(table_id, sstables_on_shards) = co_await replica::distributed_loader::get_sstables_from_upload_dir(_db, ks_name, cf_name, cfg);
co_await container().invoke_on_all([&sstables_on_shards, ks_name, cf_name, table_id, primary, scope] (sstables_loader& loader) mutable -> future<> {
co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only(primary), true, scope, {});
});
} else {
co_await replica::distributed_loader::process_upload_dir(_db, _view_builder, _view_building_worker, ks_name, cf_name, skip_cleanup, skip_reshape);
}
} catch (...) {
llog.warn("Done loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, status=failed: {}",
ks_name, cf_name, load_and_stream, primary, std::current_exception());
_loading_new_sstables = false;
throw;
}
llog.info("Done loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, status=succeeded",
ks_name, cf_name, load_and_stream, primary);
_loading_new_sstables = false;
co_return;
}
class sstables_loader::download_task_impl : public tasks::task_manager::task::impl {
sharded<sstables_loader>& _loader;
sstring _endpoint;
sstring _bucket;
sstring _ks;
sstring _cf;
sstring _prefix;
sstables_loader::stream_scope _scope;
std::vector<sstring> _sstables;
const primary_replica_only _primary_replica;
struct progress_holder {
// Wrap stream_progress in a smart pointer to enable polymorphism.
// This allows derived progress types to be passed down for per-tablet
// progress tracking while maintaining the base interface.
shared_ptr<stream_progress> progress = make_shared<stream_progress>();
};
mutable shared_mutex _progress_mutex;
// user could query for the progress even before _progress_per_shard
// is completed started, and this._status.state does not reflect the
// state of progress, so we have to track it separately.
enum class progress_state {
uninitialized,
initialized,
finalized,
} _progress_state = progress_state::uninitialized;
sharded<progress_holder> _progress_per_shard;
tasks::task_manager::task::progress _final_progress;
protected:
virtual future<> run() override;
public:
download_task_impl(tasks::task_manager::module_ptr module, sharded<sstables_loader>& loader,
sstring endpoint, sstring bucket, sstring ks, sstring cf, sstring prefix, std::vector<sstring> sstables,
sstables_loader::stream_scope scope, primary_replica_only primary_replica) noexcept
: tasks::task_manager::task::impl(module, tasks::task_id::create_random_id(), 0, "node", ks, "", "", tasks::task_id::create_null_id())
, _loader(loader)
, _endpoint(std::move(endpoint))
, _bucket(std::move(bucket))
, _ks(std::move(ks))
, _cf(std::move(cf))
, _prefix(std::move(prefix))
, _scope(scope)
, _sstables(std::move(sstables))
, _primary_replica(primary_replica)
{
_status.progress_units = "batches";
}
virtual std::string type() const override {
return "download_sstables";
}
virtual tasks::is_internal is_internal() const noexcept override {
return tasks::is_internal::no;
}
virtual tasks::is_user_task is_user_task() const noexcept override {
return tasks::is_user_task::yes;
}
tasks::is_abortable is_abortable() const noexcept override {
return tasks::is_abortable::yes;
}
virtual future<> release_resources() noexcept override {
// preserve the final progress, so we can access it after the task is
// finished
_final_progress = co_await get_progress();
co_await with_lock(_progress_mutex, [this] -> future<> {
if (std::exchange(_progress_state, progress_state::finalized) == progress_state::initialized) {
co_await _progress_per_shard.stop();
}
});
}
virtual future<tasks::task_manager::task::progress> get_progress() const override {
co_return co_await with_shared(_progress_mutex, [this] -> future<tasks::task_manager::task::progress> {
switch (_progress_state) {
case progress_state::uninitialized:
co_return tasks::task_manager::task::progress{};
case progress_state::finalized:
co_return _final_progress;
case progress_state::initialized:
break;
}
auto p = co_await _progress_per_shard.map_reduce(
adder<stream_progress>{},
[] (const progress_holder& holder) -> stream_progress {
auto p = holder.progress;
SCYLLA_ASSERT(p);
return *p;
});
co_return tasks::task_manager::task::progress {
.completed = p.completed,
.total = p.total,
};
});
}
};
future<> sstables_loader::download_task_impl::run() {
// Load-and-stream reads the entire content from SSTables, therefore it can afford to discard the bloom filter
// that might otherwise consume a significant amount of memory.
sstables::sstable_open_config cfg {
.load_bloom_filter = false,
};
llog.debug("Loading sstables from {}({}/{})", _endpoint, _bucket, _prefix);
auto ep_type = _loader.local()._storage_manager.get_endpoint_type(_endpoint);
std::vector<seastar::abort_source> shard_aborts(smp::count);
auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _sstables, _endpoint, ep_type, _bucket, _prefix, cfg, [&] {
return &shard_aborts[this_shard_id()];
});
llog.debug("Streaming sstables from {}({}/{})", _endpoint, _bucket, _prefix);
std::exception_ptr ex;
named_gate g("sstables_loader::download_task_impl");
try {
_as.check();
auto s = _as.subscribe([&]() noexcept {
try {
auto h = g.hold();
(void)smp::invoke_on_all([&shard_aborts, ex = _as.abort_requested_exception_ptr()] {
shard_aborts[this_shard_id()].request_abort_ex(ex);
}).finally([h = std::move(h)] {});
} catch (...) {
}
});
co_await _progress_per_shard.start();
_progress_state = progress_state::initialized;
co_await _loader.invoke_on_all([this, &sstables_on_shards, table_id] (sstables_loader& loader) mutable -> future<> {
co_await loader.load_and_stream(_ks, _cf, table_id, std::move(sstables_on_shards[this_shard_id()]), _primary_replica, false, _scope,
_progress_per_shard.local().progress);
});
} catch (...) {
ex = std::current_exception();
}
co_await g.close();
if (_as.abort_requested()) {
if (!ex) {
ex = _as.abort_requested_exception_ptr();
}
}
if (ex) {
co_await _loader.invoke_on_all([&sstables_on_shards] (sstables_loader&) {
sstables_on_shards[this_shard_id()] = {}; // clear on correct shard
});
co_await coroutine::return_exception_ptr(std::move(ex));
}
}
sstables_loader::sstables_loader(sharded<replica::database>& db,
sharded<service::storage_service>& ss,
netw::messaging_service& messaging,
sharded<db::view::view_builder>& vb,
sharded<db::view::view_building_worker>& vbw,
tasks::task_manager& tm,
sstables::storage_manager& sstm,
db::system_distributed_keyspace& sys_dist_ks,
seastar::scheduling_group sg)
: _db(db)
, _ss(ss)
, _messaging(messaging)
, _view_builder(vb)
, _view_building_worker(vbw)
, _task_manager_module(make_shared<task_manager_module>(tm))
, _storage_manager(sstm)
, _sys_dist_ks(sys_dist_ks)
, _sched_group(std::move(sg))
{
tm.register_module("sstables_loader", _task_manager_module);
ser::sstables_loader_rpc_verbs::register_restore_tablet(&_messaging, [this] (raft::server_id dst_id, locator::global_tablet_id gid) -> future<restore_result> {
return _ss.local().handle_raft_rpc(dst_id, [&sl = container(), gid] (auto& ss) {
return ss.do_tablet_operation(gid, "Restore", [&sl, gid] (locator::tablet_metadata_guard& guard) -> future<service::tablet_operation_result> {
co_await sl.local().download_tablet_sstables(gid, guard);
co_return service::tablet_operation_empty_result{};
}).then([] (auto res) {
return make_ready_future<restore_result>();
});
});
});
}
future<> sstables_loader::stop() {
co_await ser::sstables_loader_rpc_verbs::unregister(&_messaging),
co_await _task_manager_module->stop();
}
future<tasks::task_id> sstables_loader::download_new_sstables(sstring ks_name, sstring cf_name,
sstring prefix, std::vector<sstring> sstables,
sstring endpoint, sstring bucket, stream_scope scope, bool primary_replica) {
if (!_storage_manager.is_known_endpoint(endpoint)) {
throw std::invalid_argument(format("endpoint {} not found", endpoint));
}
llog.info("Restore sstables from {}({}) to {}.{} using scope={}, primary_replica={}", endpoint, prefix, ks_name, cf_name, scope, primary_replica);
auto task = co_await _task_manager_module->make_and_start_task<download_task_impl>({}, container(), std::move(endpoint), std::move(bucket), std::move(ks_name), std::move(cf_name),
std::move(prefix), std::move(sstables), scope, primary_replica_only(primary_replica));
co_return task->id();
}
future<sstables::shared_sstable> sstables_loader::attach_sstable(table_id tid, const minimal_sst_info& min_info) const {
auto& db = _db.local();
auto& table = db.find_column_family(tid);
llog.debug("Adding downloaded SSTables to the table {} on shard {}", table.schema()->cf_name(), this_shard_id());
auto& sst_manager = table.get_sstables_manager();
auto sst = sst_manager.make_sstable(
table.schema(), table.get_storage_options(), min_info.generation, sstables::sstable_state::normal, min_info.version, min_info.format);
sst->set_sstable_level(0);
auto erm = table.get_effective_replication_map();
sstables::sstable_open_config cfg {
.unsealed_sstable = true,
.ignore_component_digest_mismatch = db.get_config().ignore_component_digest_mismatch(),
};
co_await sst->load(erm->get_sharder(*table.schema()), cfg);
if (!sst->sstable_identifier()) {
on_internal_error(llog, "sstable identifier is required for tablet restore");
}
co_await table.add_new_sstable_and_update_cache(sst, [&sst_manager, sst] (sstables::shared_sstable loading_sst) -> future<> {
if (loading_sst == sst) {
auto writer_cfg = sst_manager.configure_writer(loading_sst->get_origin());
co_await loading_sst->seal_sstable(writer_cfg.backup);
}
});
co_return sst;
}
future<> sstables_loader::download_tablet_sstables(locator::global_tablet_id tid, locator::tablet_metadata_guard& guard) {
auto& tmap = guard.get_tablet_map();
auto* trinfo = tmap.get_tablet_transition_info(tid.tablet);
if (!trinfo) {
throw std::runtime_error(fmt::format("No transition info for tablet {}", tid));
}
if (!trinfo->restore_cfg) {
throw std::runtime_error(format("No restore config for tablet {} restore transition", tid));
}
locator::restore_config restore_cfg = *trinfo->restore_cfg;
llog.info("Downloading sstables for tablet {} from {}@{}/{}", tid, restore_cfg.snapshot_name, restore_cfg.endpoint, restore_cfg.bucket);
auto s = _db.local().find_schema(tid.table);
auto tablet_range = tmap.get_token_range(tid.tablet);
const auto& topo = guard.get_token_metadata()->get_topology();
auto keyspace_name = s->ks_name();
auto table_name = s->cf_name();
auto datacenter = topo.get_datacenter();
auto rack = topo.get_rack();
auto sst_infos = co_await _sys_dist_ks.get_snapshot_sstables(restore_cfg.snapshot_name, keyspace_name, table_name, datacenter, rack,
db::consistency_level::LOCAL_QUORUM, tablet_range.start().transform([] (auto& v) { return v.value(); }), tablet_range.end().transform([] (auto& v) { return v.value(); }));
llog.debug("{} SSTables found for tablet {}", sst_infos.size(), tid);
if (sst_infos.empty()) {
throw std::runtime_error(format("No SSTables found in system_distributed.snapshot_sstables for {}", restore_cfg.snapshot_name));
}
auto [ fully, partially ] = co_await get_sstables_for_tablet(sst_infos, tablet_range, [] (const auto& si) { return si.first_token; }, [] (const auto& si) { return si.last_token; });
if (!partially.empty()) {
llog.debug("Sstable {} is partially contained", partially.front().sstable_id);
throw std::logic_error("sstables_partially_contained");
}
llog.debug("{} SSTables filtered by range {} for tablet {}", fully.size(), tablet_range, tid);
co_await utils::get_local_injector().inject("pause_tablet_restore", utils::wait_for_message(60s));
if (fully.empty()) {
// It can happen that a tablet exists and contains no data. Just skip it
co_return;
}
std::unordered_map<sstring, std::vector<sstring>> toc_names_by_prefix;
for (const auto& e : fully) {
toc_names_by_prefix[e.prefix].emplace_back(e.toc_name);
}
auto ep_type = _storage_manager.get_endpoint_type(restore_cfg.endpoint);
sstables::sstable_open_config cfg {
.load_bloom_filter = false,
};
using sstables_col = std::vector<sstables::shared_sstable>;
using prefix_sstables = std::vector<sstables_col>;
auto sstables_on_shards = co_await map_reduce(toc_names_by_prefix, [&] (auto ent) {
return replica::distributed_loader::get_sstables_from_object_store(_db, s->ks_name(), s->cf_name(),
std::move(ent.second), restore_cfg.endpoint, ep_type, restore_cfg.bucket, std::move(ent.first), cfg, [&] { return nullptr; }).then_unpack([] (table_id, auto sstables) {
return make_ready_future<std::vector<sstables_col>>(std::move(sstables));
});
}, std::vector<prefix_sstables>(smp::count), [&] (std::vector<prefix_sstables> a, std::vector<sstables_col> b) {
// We can't move individual elements of b[i], because these
// are lw_shared_ptr-s collected on another shard. So we move
// the whole sstables_col here so that subsequent code will
// walk over it and move the pointers where it wants on proper
// shard.
for (unsigned i = 0; i < smp::count; i++) {
a[i].push_back(std::move(b[i]));
}
return a;
});
auto downloaded_ssts = co_await container().map_reduce0(
[tid, &sstables_on_shards](auto& loader) -> future<std::vector<std::vector<minimal_sst_info>>> {
sstables_col sst_chunk;
for (auto& psst : sstables_on_shards[this_shard_id()]) {
for (auto&& sst : psst) {
sst_chunk.push_back(std::move(sst));
}
}
std::vector<std::vector<minimal_sst_info>> local_min_infos(smp::count);
co_await max_concurrent_for_each(sst_chunk, 16, [&loader, tid, &local_min_infos](const auto& sst) -> future<> {
auto& table = loader._db.local().find_column_family(tid.table);
auto min_info = co_await download_sstable(loader._db.local(), table, sst, llog);
local_min_infos[min_info.shard].emplace_back(std::move(min_info));
});
co_return local_min_infos;
},
std::vector<std::vector<minimal_sst_info>>(smp::count),
[](auto init, auto&& item) -> std::vector<std::vector<minimal_sst_info>> {
for (std::size_t i = 0; i < item.size(); ++i) {
init[i].append_range(std::move(item[i]));
}
return init;
});
co_await container().invoke_on_all([tid, &downloaded_ssts, snap_name = restore_cfg.snapshot_name, keyspace_name, table_name, datacenter, rack] (auto& loader) -> future<> {
auto shard_ssts = std::move(downloaded_ssts[this_shard_id()]);
co_await max_concurrent_for_each(shard_ssts, 16, [&loader, tid, snap_name, keyspace_name, table_name, datacenter, rack](const auto& min_info) -> future<> {
sstables::shared_sstable attached_sst = co_await loader.attach_sstable(tid.table, min_info);
co_await loader._sys_dist_ks.update_sstable_download_status(snap_name,
keyspace_name,
table_name,
datacenter,
rack,
*attached_sst->sstable_identifier(),
attached_sst->get_first_decorated_key().token(),
db::is_downloaded::yes);
});
});
}
future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges) {
return tablet_sstable_streamer::get_sstables_for_tablets(sstables, std::move(tablets_ranges));
}
static future<size_t> process_manifest(input_stream<char>& is, sstring keyspace, sstring table,
const sstring& expected_snapshot_name,
const sstring& manifest_prefix, db::system_distributed_keyspace& sys_dist_ks,
db::consistency_level cl) {
// Read the entire JSON content
rjson::chunked_content content = co_await util::read_entire_stream(is);
rjson::value parsed = rjson::parse(std::move(content));
// Basic validation that tablet_count is a power of 2, as expected by our restore process
size_t tablet_count = parsed["table"]["tablet_count"].GetUint64();
if (!std::has_single_bit(tablet_count)) {
on_internal_error(llog, fmt::format("Invalid tablet_count {} in manifest {}, expected a power of 2", tablet_count, manifest_prefix));
}
// Extract the necessary fields from the manifest
// Expected JSON structure documented in docs/dev/object_storage.md
auto snapshot_name = rjson::to_sstring(parsed["snapshot"]["name"]);
if (snapshot_name != expected_snapshot_name) {
throw std::runtime_error(fmt::format("Manifest {} belongs to snapshot '{}', expected '{}'",
manifest_prefix, snapshot_name, expected_snapshot_name));
}
if (keyspace.empty()) {
keyspace = rjson::to_sstring(parsed["table"]["keyspace_name"]);
}
if (table.empty()) {
table = rjson::to_sstring(parsed["table"]["table_name"]);
}
auto datacenter = rjson::to_sstring(parsed["node"]["datacenter"]);
auto rack = rjson::to_sstring(parsed["node"]["rack"]);
// Process each sstable entry in the manifest
// FIXME: cleanup of the snapshot-related rows is needed in case anything throws in here.
auto sstables = rjson::find(parsed, "sstables");
if (!sstables) {
co_return tablet_count;
}
if (!sstables->IsArray()) {
throw std::runtime_error("Malformed manifest, 'sstables' is not array");
}
for (auto& sstable_entry : sstables->GetArray()) {
auto id = rjson::to_sstable_id(sstable_entry["id"]);
auto first_token = rjson::to_token(sstable_entry["first_token"]);
auto last_token = rjson::to_token(sstable_entry["last_token"]);
auto toc_name = rjson::to_sstring(sstable_entry["toc_name"]);
auto prefix = sstring(std::filesystem::path(manifest_prefix).parent_path().string());
// Insert the snapshot sstable metadata into system_distributed.snapshot_sstables with a TTL of 3 days, that should be enough
// for any snapshot restore operation to complete, and after that the metadata will be automatically cleaned up from the table
co_await sys_dist_ks.insert_snapshot_sstable(snapshot_name, keyspace, table, datacenter, rack, id, first_token, last_token,
toc_name, prefix, db::is_downloaded::no, cl);
}
co_return tablet_count;
}
future<size_t> populate_snapshot_sstables_from_manifests(sstables::storage_manager& sm, db::system_distributed_keyspace& sys_dist_ks, sstring keyspace, sstring table, sstring endpoint, sstring bucket, sstring expected_snapshot_name, utils::chunked_vector<sstring> manifest_prefixes, db::consistency_level cl) {
// Download manifests in parallel and populate system_distributed.snapshot_sstables
// with the content extracted from each manifest
auto client = sm.get_endpoint_client(endpoint);
// tablet_count to be returned by this function, we also validate that all manifests passed contain the same tablet count
std::optional<size_t> tablet_count;
co_await seastar::max_concurrent_for_each(manifest_prefixes, 16, [&] (const sstring& manifest_prefix) {
// Download the manifest JSON file
sstables::object_name name(bucket, manifest_prefix);
auto source = client->make_download_source(name);
return seastar::with_closeable(input_stream<char>(std::move(source)), [&] (input_stream<char>& is) {
return process_manifest(is, keyspace, table, expected_snapshot_name, manifest_prefix, sys_dist_ks, cl).then([&](size_t count) {
if (!tablet_count) {
tablet_count = count;
} else if (*tablet_count != count) {
throw std::runtime_error(fmt::format("Inconsistent tablet_count values in manifest {}: expected {}, got {}", manifest_prefix, *tablet_count, count));
}
});
});
});
co_return *tablet_count;
}
class sstables_loader::tablet_restore_task_impl : public tasks::task_manager::task::impl {
sharded<sstables_loader>& _loader;
table_id _tid;
sstring _snap_name;
sstring _endpoint;
sstring _bucket;
public:
tablet_restore_task_impl(tasks::task_manager::module_ptr module, sharded<sstables_loader>& loader, sstring ks,
table_id tid, sstring snap_name, sstring endpoint, sstring bucket) noexcept
: tasks::task_manager::task::impl(module, tasks::task_id::create_random_id(), 0, "node", ks, "", "", tasks::task_id::create_null_id())
, _loader(loader)
, _tid(std::move(tid))
, _snap_name(std::move(snap_name))
, _endpoint(std::move(endpoint))
, _bucket(std::move(bucket))
{
_status.progress_units = "batches";
}
virtual std::string type() const override {
return "restore_tablets";
}
virtual tasks::is_internal is_internal() const noexcept override {
return tasks::is_internal::no;
}
virtual tasks::is_user_task is_user_task() const noexcept override {
return tasks::is_user_task::yes;
}
tasks::is_abortable is_abortable() const noexcept override {
return tasks::is_abortable::no;
}
protected:
virtual future<> run() override {
auto& loader = _loader.local();
co_await loader._ss.local().restore_tablets(_tid, _snap_name, _endpoint, _bucket);
auto& db = loader._db.local();
auto s = db.find_schema(_tid);
const auto& topo = db.get_token_metadata().get_topology();
auto dc = topo.get_datacenter();
for (const auto& rack : topo.get_datacenter_racks().at(dc) | std::views::keys) {
auto result = co_await loader._sys_dist_ks.get_snapshot_sstables(_snap_name, s->ks_name(), s->cf_name(), dc, rack);
auto it = std::find_if(result.begin(), result.end(), [] (const auto& ent) { return !ent.downloaded; });
if (it != result.end()) {
llog.warn("Some replicas failed to download SSTables for {}:{} from {}", s->ks_name(), s->cf_name(), _snap_name);
throw std::runtime_error(format("Failed to download {}", it->toc_name));
}
}
}
};
future<tasks::task_id> sstables_loader::restore_tablets(table_id tid, sstring keyspace, sstring table, sstring snap_name, sstring endpoint, sstring bucket, utils::chunked_vector<sstring> manifests) {
co_await populate_snapshot_sstables_from_manifests(_storage_manager, _sys_dist_ks, keyspace, table, endpoint, bucket, snap_name, std::move(manifests));
auto task = co_await _task_manager_module->make_and_start_task<tablet_restore_task_impl>({}, container(), keyspace, tid, std::move(snap_name), std::move(endpoint), std::move(bucket));
co_return task->id();
}