mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-21 09:00:35 +00:00
When the storage_service::restore_tablets() resolves, it only means that tablet transitions are done, including restore transitions, but not necessarily that they succeeded. So before resolving the restoration task with success need to check if all sstables were downloaded and, if not, resolve the task with exception. Test included. It uses fault-injection to abort downloading of a single sstable early, then checks that the error was properly propagated back to the task waiting API Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
1187 lines
57 KiB
C++
1187 lines
57 KiB
C++
/*
|
|
* Copyright (C) 2021-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
|
*/
|
|
|
|
#include <fmt/ranges.h>
|
|
#include <seastar/core/coroutine.hh>
|
|
#include <seastar/core/map_reduce.hh>
|
|
#include <seastar/core/shared_ptr.hh>
|
|
#include <seastar/core/shared_mutex.hh>
|
|
#include <seastar/core/units.hh>
|
|
#include <seastar/coroutine/maybe_yield.hh>
|
|
#include <seastar/coroutine/switch_to.hh>
|
|
#include <seastar/coroutine/parallel_for_each.hh>
|
|
#include <seastar/rpc/rpc.hh>
|
|
#include "sstables_loader.hh"
|
|
#include "db/config.hh"
|
|
#include "dht/auto_refreshing_sharder.hh"
|
|
#include "replica/distributed_loader.hh"
|
|
#include "replica/database.hh"
|
|
#include "sstables/sstables_manager.hh"
|
|
#include "sstables/sstables.hh"
|
|
#include "gms/inet_address.hh"
|
|
#include "gms/feature_service.hh"
|
|
#include "streaming/stream_mutation_fragments_cmd.hh"
|
|
#include "streaming/stream_reason.hh"
|
|
#include "readers/mutation_fragment_v1_stream.hh"
|
|
#include "locator/abstract_replication_strategy.hh"
|
|
#include "message/messaging_service.hh"
|
|
#include "service/storage_service.hh"
|
|
#include "sstables_loader_helpers.hh"
|
|
#include "db/system_distributed_keyspace.hh"
|
|
#include "idl/sstables_loader.dist.hh"
|
|
|
|
#include "sstables/object_storage_client.hh"
|
|
#include "utils/rjson.hh"
|
|
#include "db/system_distributed_keyspace.hh"
|
|
|
|
#include <cfloat>
|
|
#include <algorithm>
|
|
|
|
static logging::logger llog("sstables_loader");
|
|
|
|
namespace {
|
|
|
|
class send_meta_data {
|
|
locator::host_id _node;
|
|
seastar::rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd> _sink;
|
|
seastar::rpc::source<int32_t> _source;
|
|
const bool _abort_supported = false;
|
|
bool _error_from_peer = false;
|
|
size_t _num_partitions_sent = 0;
|
|
size_t _num_bytes_sent = 0;
|
|
future<> _receive_done;
|
|
private:
|
|
future<> do_receive() {
|
|
int32_t status = 0;
|
|
while (auto status_opt = co_await _source()) {
|
|
status = std::get<0>(*status_opt);
|
|
llog.debug("send_meta_data: got error code={}, from node={}", status, _node);
|
|
if (status == -1) {
|
|
_error_from_peer = true;
|
|
}
|
|
}
|
|
llog.debug("send_meta_data: finished reading source from node={}", _node);
|
|
if (_error_from_peer) {
|
|
throw std::runtime_error(format("send_meta_data: got error code={} from node={}", status, _node));
|
|
}
|
|
co_return;
|
|
}
|
|
public:
|
|
send_meta_data(locator::host_id node,
|
|
seastar::rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd> sink,
|
|
seastar::rpc::source<int32_t> source, bool abort_supported)
|
|
: _node(std::move(node))
|
|
, _sink(std::move(sink))
|
|
, _source(std::move(source))
|
|
, _abort_supported(abort_supported)
|
|
, _receive_done(make_ready_future<>()) {
|
|
}
|
|
void receive() {
|
|
_receive_done = do_receive();
|
|
}
|
|
future<> send(const frozen_mutation_fragment& fmf, bool is_partition_start) {
|
|
if (_error_from_peer) {
|
|
throw std::runtime_error(format("send_meta_data: got error from peer node={}", _node));
|
|
}
|
|
auto size = fmf.representation().size();
|
|
if (is_partition_start) {
|
|
++_num_partitions_sent;
|
|
}
|
|
_num_bytes_sent += size;
|
|
llog.trace("send_meta_data: send mf to node={}, size={}", _node, size);
|
|
co_return co_await _sink(fmf, streaming::stream_mutation_fragments_cmd::mutation_fragment_data);
|
|
}
|
|
future<> finish(bool failed, bool aborted) {
|
|
std::exception_ptr eptr;
|
|
try {
|
|
if (_abort_supported && aborted) {
|
|
co_await _sink(frozen_mutation_fragment(bytes_ostream()), streaming::stream_mutation_fragments_cmd::abort);
|
|
} else if (failed) {
|
|
co_await _sink(frozen_mutation_fragment(bytes_ostream()), streaming::stream_mutation_fragments_cmd::error);
|
|
} else {
|
|
co_await _sink(frozen_mutation_fragment(bytes_ostream()), streaming::stream_mutation_fragments_cmd::end_of_stream);
|
|
}
|
|
} catch (...) {
|
|
eptr = std::current_exception();
|
|
llog.warn("send_meta_data: failed to send {} to node={}, err={}",
|
|
failed ? "stream_mutation_fragments_cmd::error" : "stream_mutation_fragments_cmd::end_of_stream", _node, eptr);
|
|
}
|
|
try {
|
|
co_await _sink.close();
|
|
} catch (...) {
|
|
eptr = std::current_exception();
|
|
llog.warn("send_meta_data: failed to close sink to node={}, err={}", _node, eptr);
|
|
}
|
|
try {
|
|
co_await std::move(_receive_done);
|
|
} catch (...) {
|
|
eptr = std::current_exception();
|
|
llog.warn("send_meta_data: failed to process source from node={}, err={}", _node, eptr);
|
|
}
|
|
if (eptr) {
|
|
std::rethrow_exception(eptr);
|
|
}
|
|
co_return;
|
|
}
|
|
size_t num_partitions_sent() {
|
|
return _num_partitions_sent;
|
|
}
|
|
size_t num_bytes_sent() {
|
|
return _num_bytes_sent;
|
|
}
|
|
};
|
|
|
|
} // anonymous namespace
|
|
|
|
using primary_replica_only = bool_class<struct primary_replica_only_tag>;
|
|
using unlink_sstables = bool_class<struct unlink_sstables_tag>;
|
|
|
|
class sstable_streamer {
|
|
protected:
|
|
using stream_scope = sstables_loader::stream_scope;
|
|
netw::messaging_service& _ms;
|
|
replica::database& _db;
|
|
replica::table& _table;
|
|
locator::effective_replication_map_ptr _erm;
|
|
std::vector<sstables::shared_sstable> _sstables;
|
|
const primary_replica_only _primary_replica_only;
|
|
const unlink_sstables _unlink_sstables;
|
|
const stream_scope _stream_scope;
|
|
public:
|
|
sstable_streamer(netw::messaging_service& ms, replica::database& db, ::table_id table_id, locator::effective_replication_map_ptr erm,
|
|
std::vector<sstables::shared_sstable> sstables, primary_replica_only primary, unlink_sstables unlink, stream_scope scope)
|
|
: _ms(ms)
|
|
, _db(db)
|
|
, _table(db.find_column_family(table_id))
|
|
, _erm(std::move(erm))
|
|
, _sstables(std::move(sstables))
|
|
, _primary_replica_only(primary)
|
|
, _unlink_sstables(unlink)
|
|
, _stream_scope(scope)
|
|
{
|
|
// By sorting SSTables by their primary key, we allow SSTable runs to be
|
|
// incrementally streamed.
|
|
// Overlapping run fragments can have their content deduplicated, reducing
|
|
// the amount of data we need to put on the wire.
|
|
// Elements are popped off from the back of the vector, therefore we're sorting
|
|
// it in descending order, to start from the smaller tokens.
|
|
std::ranges::sort(_sstables, [] (const sstables::shared_sstable& x, const sstables::shared_sstable& y) {
|
|
return x->compare_by_first_key(*y) > 0;
|
|
});
|
|
}
|
|
|
|
virtual ~sstable_streamer() {}
|
|
|
|
virtual future<> stream(shared_ptr<stream_progress> progress);
|
|
host_id_vector_replica_set get_endpoints(const dht::token& token) const;
|
|
future<> stream_sstable_mutations(streaming::plan_id, const dht::partition_range&, std::vector<sstables::shared_sstable>);
|
|
protected:
|
|
virtual host_id_vector_replica_set get_primary_endpoints(const dht::token& token, std::function<bool(const locator::host_id&)> filter) const;
|
|
future<> stream_sstables(const dht::partition_range&, std::vector<sstables::shared_sstable>, shared_ptr<stream_progress> progress);
|
|
private:
|
|
host_id_vector_replica_set get_all_endpoints(const dht::token& token) const;
|
|
};
|
|
|
|
class tablet_sstable_streamer : public sstable_streamer {
|
|
sharded<replica::database>& _db;
|
|
const locator::tablet_map& _tablet_map;
|
|
public:
|
|
tablet_sstable_streamer(netw::messaging_service& ms, sharded<replica::database>& db, ::table_id table_id, locator::effective_replication_map_ptr erm,
|
|
std::vector<sstables::shared_sstable> sstables, primary_replica_only primary, unlink_sstables unlink, stream_scope scope)
|
|
: sstable_streamer(ms, db.local(), table_id, std::move(erm), std::move(sstables), primary, unlink, scope)
|
|
, _db(db)
|
|
, _tablet_map(_erm->get_token_metadata().tablets().get_tablet_map(table_id)) {
|
|
}
|
|
|
|
virtual future<> stream(shared_ptr<stream_progress> on_streamed) override;
|
|
virtual host_id_vector_replica_set get_primary_endpoints(const dht::token& token, std::function<bool(const locator::host_id&)> filter) const override;
|
|
|
|
private:
|
|
host_id_vector_replica_set to_replica_set(const locator::tablet_replica_set& replicas) const {
|
|
host_id_vector_replica_set result;
|
|
result.reserve(replicas.size());
|
|
for (auto&& replica : replicas) {
|
|
result.push_back(replica.host);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
using sst_classification_info = std::vector<std::vector<minimal_sst_info>>;
|
|
|
|
future<> attach_sstable(shard_id from_shard, const sstring& ks, const sstring& cf, const minimal_sst_info& min_info) const {
|
|
llog.debug("Adding downloaded SSTables to the table {} on shard {}, submitted from shard {}", _table.schema()->cf_name(), this_shard_id(), from_shard);
|
|
auto& db = _db.local();
|
|
auto& table = db.find_column_family(ks, cf);
|
|
auto& sst_manager = table.get_sstables_manager();
|
|
auto sst = sst_manager.make_sstable(
|
|
table.schema(), table.get_storage_options(), min_info.generation, sstables::sstable_state::normal, min_info.version, min_info.format);
|
|
sst->set_sstable_level(0);
|
|
auto units = co_await sst_manager.dir_semaphore().get_units(1);
|
|
sstables::sstable_open_config cfg {
|
|
.unsealed_sstable = true,
|
|
.ignore_component_digest_mismatch = db.get_config().ignore_component_digest_mismatch(),
|
|
};
|
|
co_await sst->load(table.get_effective_replication_map()->get_sharder(*table.schema()), cfg);
|
|
co_await table.add_new_sstable_and_update_cache(sst, [&sst_manager, sst] (sstables::shared_sstable loading_sst) -> future<> {
|
|
if (loading_sst == sst) {
|
|
auto writer_cfg = sst_manager.configure_writer(loading_sst->get_origin());
|
|
co_await loading_sst->seal_sstable(writer_cfg.backup);
|
|
}
|
|
});
|
|
}
|
|
|
|
future<>
|
|
stream_fully_contained_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables, shared_ptr<stream_progress> progress) {
|
|
if (_stream_scope != stream_scope::node) {
|
|
co_return co_await stream_sstables(pr, std::move(sstables), std::move(progress));
|
|
}
|
|
llog.debug("Directly downloading {} fully contained SSTables to local node from object storage.", sstables.size());
|
|
auto downloaded_ssts = co_await download_fully_contained_sstables(std::move(sstables));
|
|
|
|
co_await smp::invoke_on_all(
|
|
[this, &downloaded_ssts, from = this_shard_id(), ks = _table.schema()->ks_name(), cf = _table.schema()->cf_name()] -> future<> {
|
|
auto shard_ssts = std::move(downloaded_ssts[this_shard_id()]);
|
|
for (const auto& min_info : shard_ssts) {
|
|
co_await attach_sstable(from, ks, cf, min_info);
|
|
}
|
|
});
|
|
if (progress) {
|
|
progress->advance(std::accumulate(downloaded_ssts.cbegin(), downloaded_ssts.cend(), 0., [](float acc, const auto& v) { return acc + v.size(); }));
|
|
}
|
|
}
|
|
|
|
future<sst_classification_info> download_fully_contained_sstables(std::vector<sstables::shared_sstable> sstables) const {
|
|
sst_classification_info downloaded_sstables(smp::count);
|
|
for (const auto& sstable : sstables) {
|
|
auto min_info = co_await download_sstable(_db.local(), _table, sstable, llog);
|
|
downloaded_sstables[min_info.shard].emplace_back(min_info);
|
|
}
|
|
co_return downloaded_sstables;
|
|
}
|
|
|
|
bool tablet_in_scope(locator::tablet_id) const;
|
|
|
|
friend future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
|
|
std::vector<dht::token_range>&& tablets_ranges);
|
|
// Pay attention, while working with tablet ranges, the `erm` must be held alive as long as we retrieve (and use here) tablet ranges from
|
|
// the tablet map. This is already done when using `tablet_sstable_streamer` class but tread carefully if you plan to use this method somewhere else.
|
|
static future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets(const std::vector<sstables::shared_sstable>& sstables,
|
|
std::vector<dht::token_range>&& tablets_ranges);
|
|
};
|
|
|
|
host_id_vector_replica_set sstable_streamer::get_endpoints(const dht::token& token) const {
|
|
auto host_filter = [&topo = _erm->get_topology(), scope = _stream_scope] (const locator::host_id& ep) {
|
|
switch (scope) {
|
|
case stream_scope::all:
|
|
return true;
|
|
case stream_scope::dc:
|
|
return topo.get_datacenter(ep) == topo.get_datacenter();
|
|
case stream_scope::rack:
|
|
return topo.get_location(ep) == topo.get_location();
|
|
case stream_scope::node:
|
|
return topo.is_me(ep);
|
|
}
|
|
};
|
|
|
|
if (_primary_replica_only) {
|
|
if (_stream_scope == stream_scope::node) {
|
|
throw std::runtime_error("Node scoped streaming of primary replica only is not supported");
|
|
}
|
|
return get_primary_endpoints(token, std::move(host_filter));
|
|
}
|
|
return get_all_endpoints(token) | std::views::filter(std::move(host_filter)) | std::ranges::to<host_id_vector_replica_set>();
|
|
}
|
|
|
|
host_id_vector_replica_set sstable_streamer::get_all_endpoints(const dht::token& token) const {
|
|
auto current_targets = _erm->get_natural_replicas(token);
|
|
auto pending = _erm->get_pending_replicas(token);
|
|
std::move(pending.begin(), pending.end(), std::back_inserter(current_targets));
|
|
return current_targets;
|
|
}
|
|
|
|
host_id_vector_replica_set sstable_streamer::get_primary_endpoints(const dht::token& token, std::function<bool(const locator::host_id&)> filter) const {
|
|
auto current_targets = _erm->get_natural_replicas(token) | std::views::filter(std::move(filter)) | std::ranges::to<host_id_vector_replica_set>();
|
|
current_targets.resize(1);
|
|
return current_targets;
|
|
}
|
|
|
|
host_id_vector_replica_set tablet_sstable_streamer::get_primary_endpoints(const dht::token& token, std::function<bool(const locator::host_id&)> filter) const {
|
|
auto tid = _tablet_map.get_tablet_id(token);
|
|
auto replicas = locator::get_primary_replicas(_tablet_map, tid, _erm->get_topology(), [filter = std::move(filter)] (const locator::tablet_replica& replica) {
|
|
return filter(replica.host);
|
|
});
|
|
return to_replica_set(replicas);
|
|
}
|
|
|
|
future<> sstable_streamer::stream(shared_ptr<stream_progress> progress) {
|
|
if (progress) {
|
|
progress->start(_sstables.size());
|
|
}
|
|
const auto full_partition_range = dht::partition_range::make_open_ended_both_sides();
|
|
|
|
co_await stream_sstables(full_partition_range, std::move(_sstables), std::move(progress));
|
|
}
|
|
|
|
bool tablet_sstable_streamer::tablet_in_scope(locator::tablet_id tid) const {
|
|
if (_stream_scope == stream_scope::all) {
|
|
return true;
|
|
}
|
|
|
|
const auto& topo = _erm->get_topology();
|
|
for (const auto& r : _tablet_map.get_tablet_info(tid).replicas) {
|
|
switch (_stream_scope) {
|
|
case stream_scope::node:
|
|
if (topo.is_me(r.host)) {
|
|
return true;
|
|
}
|
|
break;
|
|
case stream_scope::rack:
|
|
if (topo.get_location(r.host) == topo.get_location()) {
|
|
return true;
|
|
}
|
|
break;
|
|
case stream_scope::dc:
|
|
if (topo.get_datacenter(r.host) == topo.get_datacenter()) {
|
|
return true;
|
|
}
|
|
break;
|
|
case stream_scope::all: // checked above already, but still need it here
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
// The tablet_sstable_streamer implements a hierarchical streaming strategy:
|
|
//
|
|
// 1. Top Level (Per-Tablet Streaming):
|
|
// - Unlike vnode streaming, this streams sstables on a tablet-by-tablet basis
|
|
// - For a table with M tablets, each tablet[i] maps to its own set of SSTable files
|
|
// stored in tablet_to_sstables[i]
|
|
// - If tablet_to_sstables[i] is empty, that tablet's streaming is considered complete
|
|
// - Progress tracking advances by 1.0 unit when an entire tablet completes streaming
|
|
//
|
|
// 2. Inner Level (Per-SSTable Streaming):
|
|
// - Within each tablet's batch, individual SSTables are streamed in smaller sub-batches
|
|
// - The per_tablet_stream_progress class tracks streaming progress at this level:
|
|
// - Updates when a set of SSTables completes streaming
|
|
// - For n completed SSTables, advances by (n / total_sstables_in_current_tablet)
|
|
// - Provides granular tracking for the inner level streaming operations
|
|
// - Helps estimate completion time for the current tablet's batch
|
|
//
|
|
// Progress Tracking:
|
|
// The streaming progress is monitored at two granularity levels:
|
|
// - Tablet level: Overall progress where each tablet contributes 1.0 units
|
|
// - SSTable level: Progress of individual SSTable transfers within a tablet,
|
|
// managed by the per_tablet_stream_progress class
|
|
//
|
|
// Note: For simplicity, we assume uniform streaming time across tablets, even though
|
|
// tablets may vary significantly in their SSTable count or size. This assumption
|
|
// helps in progress estimation without requiring prior knowledge of SSTable
|
|
// distribution across tablets.
|
|
struct per_tablet_stream_progress : public stream_progress {
|
|
private:
|
|
shared_ptr<stream_progress> _per_table_progress;
|
|
const size_t _num_sstables_mapped;
|
|
public:
|
|
per_tablet_stream_progress(shared_ptr<stream_progress> per_table_progress,
|
|
size_t num_sstables_mapped)
|
|
: _per_table_progress(std::move(per_table_progress))
|
|
, _num_sstables_mapped(num_sstables_mapped) {
|
|
if (_per_table_progress && _num_sstables_mapped == 0) {
|
|
// consider this tablet completed if nothing to stream
|
|
_per_table_progress->advance(1.0);
|
|
}
|
|
}
|
|
void advance(float num_sstable_streamed) override {
|
|
// we should not move backward
|
|
assert(num_sstable_streamed >= 0.);
|
|
// we should call advance() only if the current tablet maps to at least
|
|
// one sstable.
|
|
assert(_num_sstables_mapped > 0);
|
|
if (_per_table_progress) {
|
|
_per_table_progress->advance(num_sstable_streamed / _num_sstables_mapped);
|
|
}
|
|
}
|
|
};
|
|
|
|
future<std::vector<tablet_sstable_collection>> tablet_sstable_streamer::get_sstables_for_tablets(const std::vector<sstables::shared_sstable>& sstables,
|
|
std::vector<dht::token_range>&& tablets_ranges) {
|
|
auto tablets_sstables =
|
|
tablets_ranges | std::views::transform([](auto range) { return tablet_sstable_collection{.tablet_range = range}; }) | std::ranges::to<std::vector>();
|
|
if (sstables.empty() || tablets_sstables.empty()) {
|
|
co_return std::move(tablets_sstables);
|
|
}
|
|
// sstables are sorted by first key in reverse order.
|
|
auto reversed_sstables = sstables | std::views::reverse;
|
|
|
|
for (auto& [tablet_range, sstables_fully_contained, sstables_partially_contained] : tablets_sstables) {
|
|
auto [fully, partially] = co_await get_sstables_for_tablet(reversed_sstables, tablet_range, [](const auto& sst) { return sst->get_first_decorated_key().token(); }, [](const auto& sst) { return sst->get_last_decorated_key().token(); });
|
|
sstables_fully_contained = std::move(fully);
|
|
sstables_partially_contained = std::move(partially);
|
|
}
|
|
co_return std::move(tablets_sstables);
|
|
}
|
|
|
|
future<> tablet_sstable_streamer::stream(shared_ptr<stream_progress> progress) {
|
|
if (progress) {
|
|
progress->start(_tablet_map.tablet_count());
|
|
}
|
|
|
|
auto classified_sstables = co_await get_sstables_for_tablets(
|
|
_sstables, _tablet_map.tablet_ids() | std::views::filter([this](auto tid) { return tablet_in_scope(tid); }) | std::views::transform([this](auto tid) {
|
|
return _tablet_map.get_token_range(tid);
|
|
}) | std::ranges::to<std::vector>());
|
|
|
|
for (auto& [tablet_range, sstables_fully_contained, sstables_partially_contained] : classified_sstables) {
|
|
auto per_tablet_progress = make_shared<per_tablet_stream_progress>(
|
|
progress,
|
|
sstables_fully_contained.size() + sstables_partially_contained.size());
|
|
auto tablet_pr = dht::to_partition_range(tablet_range);
|
|
if (!sstables_partially_contained.empty()) {
|
|
llog.debug("Streaming {} partially contained SSTables.",sstables_partially_contained.size());
|
|
co_await stream_sstables(tablet_pr, std::move(sstables_partially_contained), per_tablet_progress);
|
|
}
|
|
if (!sstables_fully_contained.empty()) {
|
|
llog.debug("Streaming {} fully contained SSTables.",sstables_fully_contained.size());
|
|
co_await stream_fully_contained_sstables(tablet_pr, std::move(sstables_fully_contained), per_tablet_progress);
|
|
}
|
|
}
|
|
}
|
|
|
|
future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables, shared_ptr<stream_progress> progress) {
|
|
size_t nr_sst_total = sstables.size();
|
|
size_t nr_sst_current = 0;
|
|
|
|
while (!sstables.empty()) {
|
|
const size_t batch_sst_nr = std::min(16uz, sstables.size());
|
|
auto sst_processed = sstables
|
|
| std::views::reverse
|
|
| std::views::take(batch_sst_nr)
|
|
| std::ranges::to<std::vector>();
|
|
sstables.erase(sstables.end() - batch_sst_nr, sstables.end());
|
|
|
|
auto ops_uuid = streaming::plan_id{utils::make_random_uuid()};
|
|
llog.info("load_and_stream: started ops_uuid={}, process [{}-{}] out of {} sstables=[{}]",
|
|
ops_uuid, nr_sst_current, nr_sst_current + sst_processed.size(), nr_sst_total,
|
|
fmt::join(sst_processed | std::views::transform([] (auto sst) { return sst->get_filename(); }), ", "));
|
|
nr_sst_current += sst_processed.size();
|
|
co_await stream_sstable_mutations(ops_uuid, pr, std::move(sst_processed));
|
|
if (progress) {
|
|
progress->advance(batch_sst_nr);
|
|
}
|
|
}
|
|
}
|
|
|
|
future<> sstable_streamer::stream_sstable_mutations(streaming::plan_id ops_uuid, const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables) {
|
|
const auto token_range = pr.transform(std::mem_fn(&dht::ring_position::token));
|
|
auto s = _table.schema();
|
|
const auto cf_id = s->id();
|
|
const auto reason = streaming::stream_reason::repair;
|
|
|
|
auto sst_set = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(s, std::move(token_range)));
|
|
size_t estimated_partitions = 0;
|
|
for (auto& sst : sstables) {
|
|
estimated_partitions += co_await sst->estimated_keys_for_range(token_range);
|
|
sst_set->insert(sst);
|
|
}
|
|
|
|
auto start_time = std::chrono::steady_clock::now();
|
|
host_id_vector_replica_set current_targets;
|
|
std::unordered_map<locator::host_id, send_meta_data> metas;
|
|
size_t num_partitions_processed = 0;
|
|
size_t num_bytes_read = 0;
|
|
auto permit = co_await _db.obtain_reader_permit(_table, "sstables_loader::load_and_stream()", db::no_timeout, {});
|
|
auto reader = mutation_fragment_v1_stream(_table.make_streaming_reader(s, std::move(permit), pr, sst_set, gc_clock::now()));
|
|
std::exception_ptr eptr;
|
|
bool failed = false;
|
|
|
|
try {
|
|
while (auto mf = co_await reader()) {
|
|
bool is_partition_start = mf->is_partition_start();
|
|
if (is_partition_start) {
|
|
++num_partitions_processed;
|
|
auto& start = mf->as_partition_start();
|
|
const auto& current_dk = start.key();
|
|
|
|
current_targets = get_endpoints(current_dk.token());
|
|
llog.trace("load_and_stream: ops_uuid={}, current_dk={}, current_targets={}", ops_uuid,
|
|
current_dk.token(), current_targets);
|
|
for (auto& node : current_targets) {
|
|
if (!metas.contains(node)) {
|
|
auto [sink, source] = co_await _ms.make_sink_and_source_for_stream_mutation_fragments(reader.schema()->version(),
|
|
ops_uuid, cf_id, estimated_partitions, reason, service::default_session_id, node);
|
|
bool abort_supported = _ms.supports_load_and_stream_abort_rpc_message();
|
|
llog.debug("load_and_stream: ops_uuid={}, make sink and source for node={}", ops_uuid, node);
|
|
metas.emplace(node, send_meta_data(node, std::move(sink), std::move(source), abort_supported));
|
|
metas.at(node).receive();
|
|
}
|
|
}
|
|
}
|
|
frozen_mutation_fragment fmf = freeze(*s, *mf);
|
|
num_bytes_read += fmf.representation().size();
|
|
co_await coroutine::parallel_for_each(current_targets, [&metas, &fmf, is_partition_start] (const locator::host_id& node) {
|
|
return metas.at(node).send(fmf, is_partition_start);
|
|
});
|
|
}
|
|
} catch (...) {
|
|
failed = true;
|
|
eptr = std::current_exception();
|
|
llog.warn("load_and_stream: ops_uuid={}, ks={}, table={}, send_phase, err={}",
|
|
ops_uuid, s->ks_name(), s->cf_name(), eptr);
|
|
}
|
|
co_await reader.close();
|
|
try {
|
|
co_await coroutine::parallel_for_each(metas.begin(), metas.end(), [failed, eptr] (std::pair<const locator::host_id, send_meta_data>& pair) {
|
|
auto& meta = pair.second;
|
|
if (eptr) {
|
|
try {
|
|
std::rethrow_exception(eptr);
|
|
} catch (const abort_requested_exception&) {
|
|
return meta.finish(failed, true);
|
|
} catch (...) {
|
|
// just fall through
|
|
}
|
|
}
|
|
return meta.finish(failed, false);
|
|
});
|
|
} catch (...) {
|
|
failed = true;
|
|
eptr = std::current_exception();
|
|
llog.warn("load_and_stream: ops_uuid={}, ks={}, table={}, finish_phase, err={}",
|
|
ops_uuid, s->ks_name(), s->cf_name(), eptr);
|
|
}
|
|
if (!failed && _unlink_sstables) {
|
|
try {
|
|
co_await coroutine::parallel_for_each(sstables, [&] (sstables::shared_sstable& sst) {
|
|
llog.debug("load_and_stream: ops_uuid={}, ks={}, table={}, remove sst={}",
|
|
ops_uuid, s->ks_name(), s->cf_name(), sst->toc_filename());
|
|
return sst->mark_for_deletion();
|
|
});
|
|
} catch (...) {
|
|
failed = true;
|
|
eptr = std::current_exception();
|
|
llog.warn("load_and_stream: ops_uuid={}, ks={}, table={}, del_sst_phase, err={}",
|
|
ops_uuid, s->ks_name(), s->cf_name(), eptr);
|
|
}
|
|
}
|
|
auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::steady_clock::now() - start_time).count();
|
|
for (auto& [node, meta] : metas) {
|
|
llog.info("load_and_stream: ops_uuid={}, ks={}, table={}, target_node={}, num_partitions_sent={}, num_bytes_sent={}",
|
|
ops_uuid, s->ks_name(), s->cf_name(), node, meta.num_partitions_sent(), meta.num_bytes_sent());
|
|
}
|
|
auto partition_rate = std::fabs(duration) > FLT_EPSILON ? num_partitions_processed / duration : 0;
|
|
auto bytes_rate = std::fabs(duration) > FLT_EPSILON ? num_bytes_read / duration / 1024 / 1024 : 0;
|
|
auto status = failed ? "failed" : "succeeded";
|
|
llog.info("load_and_stream: finished ops_uuid={}, ks={}, table={}, partitions_processed={} partitions, bytes_processed={} bytes, partitions_per_second={} partitions/s, bytes_per_second={} MiB/s, duration={} s, status={}",
|
|
ops_uuid, s->ks_name(), s->cf_name(), num_partitions_processed, num_bytes_read, partition_rate, bytes_rate, duration, status);
|
|
if (failed) {
|
|
std::rethrow_exception(eptr);
|
|
}
|
|
}
|
|
|
|
future<locator::effective_replication_map_ptr> sstables_loader::await_topology_quiesced_and_get_erm(::table_id table_id) {
|
|
// By waiting for topology to quiesce, we guarantee load-and-stream will not start in the middle
|
|
// of a topology operation that changes the token range boundaries, e.g. split or merge.
|
|
// Split, for example, first executes the barrier and then splits the tablets.
|
|
// So it can happen a sstable is generated between those steps and will incorrectly span two
|
|
// tablets. We want to serialize load-and-stream and split finalization (a topology op).
|
|
|
|
locator::effective_replication_map_ptr erm;
|
|
while (true) {
|
|
auto& t = _db.local().find_column_family(table_id);
|
|
erm = t.get_effective_replication_map();
|
|
auto expected_topology_version = erm->get_token_metadata().get_version();
|
|
auto& ss = _ss.local();
|
|
|
|
// The awaiting only works with raft enabled, and we only need it with tablets,
|
|
// so let's bypass the awaiting when tablet is disabled.
|
|
if (!t.uses_tablets()) {
|
|
break;
|
|
}
|
|
// optimistically attempt to grab an erm on quiesced topology
|
|
if (co_await ss.verify_topology_quiesced(expected_topology_version)) {
|
|
break;
|
|
}
|
|
erm = nullptr;
|
|
co_await _ss.local().await_topology_quiesced();
|
|
}
|
|
|
|
co_return std::move(erm);
|
|
}
|
|
|
|
future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
|
|
::table_id table_id, std::vector<sstables::shared_sstable> sstables, primary_replica_only primary, bool unlink, stream_scope scope,
|
|
shared_ptr<stream_progress> progress) {
|
|
// streamer guarantees topology stability, for correctness, by holding effective_replication_map
|
|
// throughout its lifetime.
|
|
auto erm = co_await await_topology_quiesced_and_get_erm(table_id);
|
|
|
|
std::unique_ptr<sstable_streamer> streamer;
|
|
if (_db.local().find_column_family(table_id).uses_tablets()) {
|
|
streamer =
|
|
std::make_unique<tablet_sstable_streamer>(_messaging, _db, table_id, std::move(erm), std::move(sstables), primary, unlink_sstables(unlink), scope);
|
|
} else {
|
|
streamer =
|
|
std::make_unique<sstable_streamer>(_messaging, _db.local(), table_id, std::move(erm), std::move(sstables), primary, unlink_sstables(unlink), scope);
|
|
}
|
|
|
|
co_await streamer->stream(progress);
|
|
}
|
|
|
|
// For more details, see distributed_loader::process_upload_dir().
|
|
// All the global operations are going to happen here, and just the reloading happens
|
|
// in there.
|
|
future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name,
|
|
bool load_and_stream, bool primary, bool skip_cleanup, bool skip_reshape, stream_scope scope) {
|
|
if (_loading_new_sstables) {
|
|
throw std::runtime_error("Already loading SSTables. Try again later");
|
|
} else {
|
|
_loading_new_sstables = true;
|
|
}
|
|
|
|
co_await coroutine::switch_to(_sched_group);
|
|
|
|
sstring load_and_stream_desc = fmt::format("{}", load_and_stream);
|
|
const auto& rs = _db.local().find_keyspace(ks_name).get_replication_strategy();
|
|
if (rs.is_per_table() && !load_and_stream) {
|
|
load_and_stream = true;
|
|
load_and_stream_desc = "auto-enabled-for-tablets";
|
|
}
|
|
|
|
if (load_and_stream && skip_cleanup) {
|
|
throw std::runtime_error("Skipping cleanup is not possible when doing load-and-stream");
|
|
}
|
|
|
|
if (load_and_stream && skip_reshape) {
|
|
throw std::runtime_error("Skipping reshape is not possible when doing load-and-stream");
|
|
}
|
|
|
|
llog.info("Loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, skip_cleanup={}, skip_reshape={}, scope={}",
|
|
ks_name, cf_name, load_and_stream_desc, primary, skip_cleanup, skip_reshape, scope);
|
|
try {
|
|
if (load_and_stream) {
|
|
::table_id table_id;
|
|
std::vector<std::vector<sstables::shared_sstable>> sstables_on_shards;
|
|
// Load-and-stream reads the entire content from SSTables, therefore it can afford to discard the bloom filter
|
|
// that might otherwise consume a significant amount of memory.
|
|
sstables::sstable_open_config cfg {
|
|
.load_bloom_filter = false,
|
|
.ignore_component_digest_mismatch = _db.local().get_config().ignore_component_digest_mismatch(),
|
|
};
|
|
std::tie(table_id, sstables_on_shards) = co_await replica::distributed_loader::get_sstables_from_upload_dir(_db, ks_name, cf_name, cfg);
|
|
co_await container().invoke_on_all([&sstables_on_shards, ks_name, cf_name, table_id, primary, scope] (sstables_loader& loader) mutable -> future<> {
|
|
co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only(primary), true, scope, {});
|
|
});
|
|
} else {
|
|
co_await replica::distributed_loader::process_upload_dir(_db, _view_builder, _view_building_worker, ks_name, cf_name, skip_cleanup, skip_reshape);
|
|
}
|
|
} catch (...) {
|
|
llog.warn("Done loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, status=failed: {}",
|
|
ks_name, cf_name, load_and_stream, primary, std::current_exception());
|
|
_loading_new_sstables = false;
|
|
throw;
|
|
}
|
|
llog.info("Done loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, status=succeeded",
|
|
ks_name, cf_name, load_and_stream, primary);
|
|
_loading_new_sstables = false;
|
|
co_return;
|
|
}
|
|
|
|
class sstables_loader::download_task_impl : public tasks::task_manager::task::impl {
|
|
sharded<sstables_loader>& _loader;
|
|
sstring _endpoint;
|
|
sstring _bucket;
|
|
sstring _ks;
|
|
sstring _cf;
|
|
sstring _prefix;
|
|
sstables_loader::stream_scope _scope;
|
|
std::vector<sstring> _sstables;
|
|
const primary_replica_only _primary_replica;
|
|
struct progress_holder {
|
|
// Wrap stream_progress in a smart pointer to enable polymorphism.
|
|
// This allows derived progress types to be passed down for per-tablet
|
|
// progress tracking while maintaining the base interface.
|
|
shared_ptr<stream_progress> progress = make_shared<stream_progress>();
|
|
};
|
|
mutable shared_mutex _progress_mutex;
|
|
// user could query for the progress even before _progress_per_shard
|
|
// is completed started, and this._status.state does not reflect the
|
|
// state of progress, so we have to track it separately.
|
|
enum class progress_state {
|
|
uninitialized,
|
|
initialized,
|
|
finalized,
|
|
} _progress_state = progress_state::uninitialized;
|
|
sharded<progress_holder> _progress_per_shard;
|
|
tasks::task_manager::task::progress _final_progress;
|
|
|
|
protected:
|
|
virtual future<> run() override;
|
|
|
|
public:
|
|
download_task_impl(tasks::task_manager::module_ptr module, sharded<sstables_loader>& loader,
|
|
sstring endpoint, sstring bucket, sstring ks, sstring cf, sstring prefix, std::vector<sstring> sstables,
|
|
sstables_loader::stream_scope scope, primary_replica_only primary_replica) noexcept
|
|
: tasks::task_manager::task::impl(module, tasks::task_id::create_random_id(), 0, "node", ks, "", "", tasks::task_id::create_null_id())
|
|
, _loader(loader)
|
|
, _endpoint(std::move(endpoint))
|
|
, _bucket(std::move(bucket))
|
|
, _ks(std::move(ks))
|
|
, _cf(std::move(cf))
|
|
, _prefix(std::move(prefix))
|
|
, _scope(scope)
|
|
, _sstables(std::move(sstables))
|
|
, _primary_replica(primary_replica)
|
|
{
|
|
_status.progress_units = "batches";
|
|
}
|
|
|
|
virtual std::string type() const override {
|
|
return "download_sstables";
|
|
}
|
|
|
|
virtual tasks::is_internal is_internal() const noexcept override {
|
|
return tasks::is_internal::no;
|
|
}
|
|
|
|
virtual tasks::is_user_task is_user_task() const noexcept override {
|
|
return tasks::is_user_task::yes;
|
|
}
|
|
|
|
tasks::is_abortable is_abortable() const noexcept override {
|
|
return tasks::is_abortable::yes;
|
|
}
|
|
|
|
virtual future<> release_resources() noexcept override {
|
|
// preserve the final progress, so we can access it after the task is
|
|
// finished
|
|
_final_progress = co_await get_progress();
|
|
co_await with_lock(_progress_mutex, [this] -> future<> {
|
|
if (std::exchange(_progress_state, progress_state::finalized) == progress_state::initialized) {
|
|
co_await _progress_per_shard.stop();
|
|
}
|
|
});
|
|
}
|
|
|
|
virtual future<tasks::task_manager::task::progress> get_progress() const override {
|
|
co_return co_await with_shared(_progress_mutex, [this] -> future<tasks::task_manager::task::progress> {
|
|
switch (_progress_state) {
|
|
case progress_state::uninitialized:
|
|
co_return tasks::task_manager::task::progress{};
|
|
case progress_state::finalized:
|
|
co_return _final_progress;
|
|
case progress_state::initialized:
|
|
break;
|
|
}
|
|
auto p = co_await _progress_per_shard.map_reduce(
|
|
adder<stream_progress>{},
|
|
[] (const progress_holder& holder) -> stream_progress {
|
|
auto p = holder.progress;
|
|
SCYLLA_ASSERT(p);
|
|
return *p;
|
|
});
|
|
co_return tasks::task_manager::task::progress {
|
|
.completed = p.completed,
|
|
.total = p.total,
|
|
};
|
|
});
|
|
}
|
|
};
|
|
|
|
future<> sstables_loader::download_task_impl::run() {
|
|
// Load-and-stream reads the entire content from SSTables, therefore it can afford to discard the bloom filter
|
|
// that might otherwise consume a significant amount of memory.
|
|
sstables::sstable_open_config cfg {
|
|
.load_bloom_filter = false,
|
|
.ignore_component_digest_mismatch = _loader.local()._db.local().get_config().ignore_component_digest_mismatch(),
|
|
};
|
|
llog.debug("Loading sstables from {}({}/{})", _endpoint, _bucket, _prefix);
|
|
|
|
auto ep_type = _loader.local()._storage_manager.get_endpoint_type(_endpoint);
|
|
std::vector<seastar::abort_source> shard_aborts(smp::count);
|
|
auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _sstables, _endpoint, ep_type, _bucket, _prefix, cfg, [&] {
|
|
return &shard_aborts[this_shard_id()];
|
|
});
|
|
llog.debug("Streaming sstables from {}({}/{})", _endpoint, _bucket, _prefix);
|
|
std::exception_ptr ex;
|
|
named_gate g("sstables_loader::download_task_impl");
|
|
try {
|
|
_as.check();
|
|
|
|
auto s = _as.subscribe([&]() noexcept {
|
|
try {
|
|
auto h = g.hold();
|
|
(void)smp::invoke_on_all([&shard_aborts, ex = _as.abort_requested_exception_ptr()] {
|
|
shard_aborts[this_shard_id()].request_abort_ex(ex);
|
|
}).finally([h = std::move(h)] {});
|
|
} catch (...) {
|
|
}
|
|
});
|
|
co_await _progress_per_shard.start();
|
|
_progress_state = progress_state::initialized;
|
|
co_await _loader.invoke_on_all([this, &sstables_on_shards, table_id] (sstables_loader& loader) mutable -> future<> {
|
|
co_await loader.load_and_stream(_ks, _cf, table_id, std::move(sstables_on_shards[this_shard_id()]), _primary_replica, false, _scope,
|
|
_progress_per_shard.local().progress);
|
|
});
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
|
|
co_await g.close();
|
|
|
|
if (_as.abort_requested()) {
|
|
if (!ex) {
|
|
ex = _as.abort_requested_exception_ptr();
|
|
}
|
|
}
|
|
|
|
if (ex) {
|
|
co_await _loader.invoke_on_all([&sstables_on_shards] (sstables_loader&) {
|
|
sstables_on_shards[this_shard_id()] = {}; // clear on correct shard
|
|
});
|
|
co_await coroutine::return_exception_ptr(std::move(ex));
|
|
}
|
|
}
|
|
|
|
sstables_loader::sstables_loader(sharded<replica::database>& db,
|
|
sharded<service::storage_service>& ss,
|
|
netw::messaging_service& messaging,
|
|
sharded<db::view::view_builder>& vb,
|
|
sharded<db::view::view_building_worker>& vbw,
|
|
tasks::task_manager& tm,
|
|
sstables::storage_manager& sstm,
|
|
db::system_distributed_keyspace& sys_dist_ks,
|
|
seastar::scheduling_group sg)
|
|
: _db(db)
|
|
, _ss(ss)
|
|
, _messaging(messaging)
|
|
, _view_builder(vb)
|
|
, _view_building_worker(vbw)
|
|
, _task_manager_module(make_shared<task_manager_module>(tm))
|
|
, _storage_manager(sstm)
|
|
, _sys_dist_ks(sys_dist_ks)
|
|
, _sched_group(std::move(sg))
|
|
{
|
|
tm.register_module("sstables_loader", _task_manager_module);
|
|
ser::sstables_loader_rpc_verbs::register_restore_tablet(&_messaging, [this] (raft::server_id dst_id, locator::global_tablet_id gid) -> future<restore_result> {
|
|
return _ss.local().handle_raft_rpc(dst_id, [&sl = container(), gid] (auto& ss) {
|
|
return ss.do_tablet_operation(gid, "Restore", [&sl, gid] (locator::tablet_metadata_guard& guard) -> future<service::tablet_operation_result> {
|
|
co_await sl.local().download_tablet_sstables(gid, guard);
|
|
co_return service::tablet_operation_empty_result{};
|
|
}).then([] (auto res) {
|
|
return make_ready_future<restore_result>();
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
future<> sstables_loader::stop() {
|
|
co_await ser::sstables_loader_rpc_verbs::unregister(&_messaging),
|
|
co_await _task_manager_module->stop();
|
|
}
|
|
|
|
future<tasks::task_id> sstables_loader::download_new_sstables(sstring ks_name, sstring cf_name,
|
|
sstring prefix, std::vector<sstring> sstables,
|
|
sstring endpoint, sstring bucket, stream_scope scope, bool primary_replica) {
|
|
if (!_storage_manager.is_known_endpoint(endpoint)) {
|
|
throw std::invalid_argument(format("endpoint {} not found", endpoint));
|
|
}
|
|
llog.info("Restore sstables from {}({}) to {}.{} using scope={}, primary_replica={}", endpoint, prefix, ks_name, cf_name, scope, primary_replica);
|
|
|
|
auto task = co_await _task_manager_module->make_and_start_task<download_task_impl>({}, container(), std::move(endpoint), std::move(bucket), std::move(ks_name), std::move(cf_name),
|
|
std::move(prefix), std::move(sstables), scope, primary_replica_only(primary_replica));
|
|
co_return task->id();
|
|
}
|
|
|
|
future<sstables::shared_sstable> sstables_loader::attach_sstable(table_id tid, const minimal_sst_info& min_info) const {
|
|
auto& db = _db.local();
|
|
auto& table = db.find_column_family(tid);
|
|
llog.debug("Adding downloaded SSTables to the table {} on shard {}", table.schema()->cf_name(), this_shard_id());
|
|
auto& sst_manager = table.get_sstables_manager();
|
|
auto sst = sst_manager.make_sstable(
|
|
table.schema(), table.get_storage_options(), min_info.generation, sstables::sstable_state::normal, min_info.version, min_info.format);
|
|
sst->set_sstable_level(0);
|
|
auto erm = table.get_effective_replication_map();
|
|
sstables::sstable_open_config cfg {
|
|
.unsealed_sstable = true,
|
|
.ignore_component_digest_mismatch = db.get_config().ignore_component_digest_mismatch(),
|
|
};
|
|
co_await sst->load(erm->get_sharder(*table.schema()), cfg);
|
|
if (!sst->sstable_identifier()) {
|
|
on_internal_error(llog, "sstable identifier is required for tablet restore");
|
|
}
|
|
co_await table.add_new_sstable_and_update_cache(sst, [&sst_manager, sst] (sstables::shared_sstable loading_sst) -> future<> {
|
|
if (loading_sst == sst) {
|
|
auto writer_cfg = sst_manager.configure_writer(loading_sst->get_origin());
|
|
co_await loading_sst->seal_sstable(writer_cfg.backup);
|
|
}
|
|
});
|
|
co_return sst;
|
|
}
|
|
|
|
future<> sstables_loader::download_tablet_sstables(locator::global_tablet_id tid, locator::tablet_metadata_guard& guard) {
|
|
auto& tmap = guard.get_tablet_map();
|
|
|
|
auto* trinfo = tmap.get_tablet_transition_info(tid.tablet);
|
|
if (!trinfo) {
|
|
throw std::runtime_error(fmt::format("No transition info for tablet {}", tid));
|
|
}
|
|
if (!trinfo->restore_cfg) {
|
|
throw std::runtime_error(format("No restore config for tablet {} restore transition", tid));
|
|
}
|
|
|
|
locator::restore_config restore_cfg = *trinfo->restore_cfg;
|
|
llog.info("Downloading sstables for tablet {} from {}@{}/{}", tid, restore_cfg.snapshot_name, restore_cfg.endpoint, restore_cfg.bucket);
|
|
|
|
auto s = _db.local().find_schema(tid.table);
|
|
auto tablet_range = tmap.get_token_range(tid.tablet);
|
|
const auto& topo = guard.get_token_metadata()->get_topology();
|
|
auto keyspace_name = s->ks_name();
|
|
auto table_name = s->cf_name();
|
|
auto datacenter = topo.get_datacenter();
|
|
auto rack = topo.get_rack();
|
|
|
|
auto sst_infos = co_await _sys_dist_ks.get_snapshot_sstables(restore_cfg.snapshot_name, keyspace_name, table_name, datacenter, rack,
|
|
db::consistency_level::LOCAL_QUORUM, tablet_range.start().transform([] (auto& v) { return v.value(); }), tablet_range.end().transform([] (auto& v) { return v.value(); }));
|
|
llog.debug("{} SSTables found for tablet {}", sst_infos.size(), tid);
|
|
if (sst_infos.empty()) {
|
|
throw std::runtime_error(format("No SSTables found in system_distributed.snapshot_sstables for {}", restore_cfg.snapshot_name));
|
|
}
|
|
|
|
auto [ fully, partially ] = co_await get_sstables_for_tablet(sst_infos, tablet_range, [] (const auto& si) { return si.first_token; }, [] (const auto& si) { return si.last_token; });
|
|
if (!partially.empty()) {
|
|
llog.debug("Sstable {} is partially contained", partially.front().sstable_id);
|
|
throw std::logic_error("sstables_partially_contained");
|
|
}
|
|
llog.debug("{} SSTables filtered by range {} for tablet {}", fully.size(), tablet_range, tid);
|
|
if (fully.empty()) {
|
|
// It can happen that a tablet exists and contains no data. Just skip it
|
|
co_return;
|
|
}
|
|
|
|
std::unordered_map<sstring, std::vector<sstring>> toc_names_by_prefix;
|
|
for (const auto& e : fully) {
|
|
toc_names_by_prefix[e.prefix].emplace_back(e.toc_name);
|
|
}
|
|
|
|
auto ep_type = _storage_manager.get_endpoint_type(restore_cfg.endpoint);
|
|
sstables::sstable_open_config cfg {
|
|
.load_bloom_filter = false,
|
|
};
|
|
|
|
using sstables_col = std::vector<sstables::shared_sstable>;
|
|
using prefix_sstables = std::vector<sstables_col>;
|
|
|
|
auto sstables_on_shards = co_await map_reduce(toc_names_by_prefix, [&] (auto ent) {
|
|
return replica::distributed_loader::get_sstables_from_object_store(_db, s->ks_name(), s->cf_name(),
|
|
std::move(ent.second), restore_cfg.endpoint, ep_type, restore_cfg.bucket, std::move(ent.first), cfg, [&] { return nullptr; }).then_unpack([] (table_id, auto sstables) {
|
|
return make_ready_future<std::vector<sstables_col>>(std::move(sstables));
|
|
});
|
|
}, std::vector<prefix_sstables>(smp::count), [&] (std::vector<prefix_sstables> a, std::vector<sstables_col> b) {
|
|
// We can't move individual elements of b[i], because these
|
|
// are lw_shared_ptr-s collected on another shard. So we move
|
|
// the whole sstables_col here so that subsequent code will
|
|
// walk over it and move the pointers where it wants on proper
|
|
// shard.
|
|
for (unsigned i = 0; i < smp::count; i++) {
|
|
a[i].push_back(std::move(b[i]));
|
|
}
|
|
return a;
|
|
});
|
|
|
|
auto downloaded_ssts = co_await container().map_reduce0(
|
|
[tid, &sstables_on_shards](auto& loader) -> future<std::vector<std::vector<minimal_sst_info>>> {
|
|
sstables_col sst_chunk;
|
|
for (auto& psst : sstables_on_shards[this_shard_id()]) {
|
|
for (auto&& sst : psst) {
|
|
sst_chunk.push_back(std::move(sst));
|
|
}
|
|
}
|
|
std::vector<std::vector<minimal_sst_info>> local_min_infos(smp::count);
|
|
co_await max_concurrent_for_each(sst_chunk, 16, [&loader, tid, &local_min_infos](const auto& sst) -> future<> {
|
|
auto& table = loader._db.local().find_column_family(tid.table);
|
|
auto min_info = co_await download_sstable(loader._db.local(), table, sst, llog);
|
|
local_min_infos[min_info.shard].emplace_back(std::move(min_info));
|
|
});
|
|
co_return local_min_infos;
|
|
},
|
|
std::vector<std::vector<minimal_sst_info>>(smp::count),
|
|
[](auto init, auto&& item) -> std::vector<std::vector<minimal_sst_info>> {
|
|
for (std::size_t i = 0; i < item.size(); ++i) {
|
|
init[i].append_range(std::move(item[i]));
|
|
}
|
|
return init;
|
|
});
|
|
|
|
co_await container().invoke_on_all([tid, &downloaded_ssts, snap_name = restore_cfg.snapshot_name, keyspace_name, table_name, datacenter, rack] (auto& loader) -> future<> {
|
|
auto shard_ssts = std::move(downloaded_ssts[this_shard_id()]);
|
|
co_await max_concurrent_for_each(shard_ssts, 16, [&loader, tid, snap_name, keyspace_name, table_name, datacenter, rack](const auto& min_info) -> future<> {
|
|
sstables::shared_sstable attached_sst = co_await loader.attach_sstable(tid.table, min_info);
|
|
co_await loader._sys_dist_ks.update_sstable_download_status(snap_name,
|
|
keyspace_name,
|
|
table_name,
|
|
datacenter,
|
|
rack,
|
|
*attached_sst->sstable_identifier(),
|
|
attached_sst->get_first_decorated_key().token(),
|
|
db::is_downloaded::yes);
|
|
});
|
|
});
|
|
}
|
|
|
|
future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
|
|
std::vector<dht::token_range>&& tablets_ranges) {
|
|
return tablet_sstable_streamer::get_sstables_for_tablets(sstables, std::move(tablets_ranges));
|
|
}
|
|
|
|
static future<size_t> process_manifest(input_stream<char>& is, sstring keyspace, sstring table,
|
|
const sstring& expected_snapshot_name,
|
|
const sstring& manifest_prefix, db::system_distributed_keyspace& sys_dist_ks,
|
|
db::consistency_level cl) {
|
|
// Read the entire JSON content
|
|
rjson::chunked_content content = co_await util::read_entire_stream(is);
|
|
|
|
rjson::value parsed = rjson::parse(std::move(content));
|
|
|
|
// Basic validation that tablet_count is a power of 2, as expected by our restore process
|
|
size_t tablet_count = parsed["table"]["tablet_count"].GetUint64();
|
|
if (!std::has_single_bit(tablet_count)) {
|
|
on_internal_error(llog, fmt::format("Invalid tablet_count {} in manifest {}, expected a power of 2", tablet_count, manifest_prefix));
|
|
}
|
|
|
|
// Extract the necessary fields from the manifest
|
|
// Expected JSON structure documented in docs/dev/object_storage.md
|
|
auto snapshot_name = rjson::to_sstring(parsed["snapshot"]["name"]);
|
|
if (snapshot_name != expected_snapshot_name) {
|
|
throw std::runtime_error(fmt::format("Manifest {} belongs to snapshot '{}', expected '{}'",
|
|
manifest_prefix, snapshot_name, expected_snapshot_name));
|
|
}
|
|
if (keyspace.empty()) {
|
|
keyspace = rjson::to_sstring(parsed["table"]["keyspace_name"]);
|
|
}
|
|
if (table.empty()) {
|
|
table = rjson::to_sstring(parsed["table"]["table_name"]);
|
|
}
|
|
auto datacenter = rjson::to_sstring(parsed["node"]["datacenter"]);
|
|
auto rack = rjson::to_sstring(parsed["node"]["rack"]);
|
|
|
|
// Process each sstable entry in the manifest
|
|
// FIXME: cleanup of the snapshot-related rows is needed in case anything throws in here.
|
|
auto sstables = rjson::find(parsed, "sstables");
|
|
if (!sstables) {
|
|
co_return tablet_count;
|
|
}
|
|
if (!sstables->IsArray()) {
|
|
throw std::runtime_error("Malformed manifest, 'sstables' is not array");
|
|
}
|
|
|
|
for (auto& sstable_entry : sstables->GetArray()) {
|
|
auto id = rjson::to_sstable_id(sstable_entry["id"]);
|
|
auto first_token = rjson::to_token(sstable_entry["first_token"]);
|
|
auto last_token = rjson::to_token(sstable_entry["last_token"]);
|
|
auto toc_name = rjson::to_sstring(sstable_entry["toc_name"]);
|
|
auto prefix = sstring(std::filesystem::path(manifest_prefix).parent_path().string());
|
|
// Insert the snapshot sstable metadata into system_distributed.snapshot_sstables with a TTL of 3 days, that should be enough
|
|
// for any snapshot restore operation to complete, and after that the metadata will be automatically cleaned up from the table
|
|
co_await sys_dist_ks.insert_snapshot_sstable(snapshot_name, keyspace, table, datacenter, rack, id, first_token, last_token,
|
|
toc_name, prefix, db::is_downloaded::no, cl);
|
|
}
|
|
|
|
co_return tablet_count;
|
|
}
|
|
|
|
future<size_t> populate_snapshot_sstables_from_manifests(sstables::storage_manager& sm, db::system_distributed_keyspace& sys_dist_ks, sstring keyspace, sstring table, sstring endpoint, sstring bucket, sstring expected_snapshot_name, utils::chunked_vector<sstring> manifest_prefixes, db::consistency_level cl) {
|
|
// Download manifests in parallel and populate system_distributed.snapshot_sstables
|
|
// with the content extracted from each manifest
|
|
auto client = sm.get_endpoint_client(endpoint);
|
|
|
|
// tablet_count to be returned by this function, we also validate that all manifests passed contain the same tablet count
|
|
std::optional<size_t> tablet_count;
|
|
|
|
co_await seastar::max_concurrent_for_each(manifest_prefixes, 16, [&] (const sstring& manifest_prefix) {
|
|
// Download the manifest JSON file
|
|
sstables::object_name name(bucket, manifest_prefix);
|
|
auto source = client->make_download_source(name);
|
|
return seastar::with_closeable(input_stream<char>(std::move(source)), [&] (input_stream<char>& is) {
|
|
return process_manifest(is, keyspace, table, expected_snapshot_name, manifest_prefix, sys_dist_ks, cl).then([&](size_t count) {
|
|
if (!tablet_count) {
|
|
tablet_count = count;
|
|
} else if (*tablet_count != count) {
|
|
throw std::runtime_error(fmt::format("Inconsistent tablet_count values in manifest {}: expected {}, got {}", manifest_prefix, *tablet_count, count));
|
|
}
|
|
});
|
|
});
|
|
});
|
|
|
|
co_return *tablet_count;
|
|
}
|
|
|
|
class sstables_loader::tablet_restore_task_impl : public tasks::task_manager::task::impl {
|
|
sharded<sstables_loader>& _loader;
|
|
table_id _tid;
|
|
sstring _snap_name;
|
|
sstring _endpoint;
|
|
sstring _bucket;
|
|
|
|
public:
|
|
tablet_restore_task_impl(tasks::task_manager::module_ptr module, sharded<sstables_loader>& loader, sstring ks,
|
|
table_id tid, sstring snap_name, sstring endpoint, sstring bucket) noexcept
|
|
: tasks::task_manager::task::impl(module, tasks::task_id::create_random_id(), 0, "node", ks, "", "", tasks::task_id::create_null_id())
|
|
, _loader(loader)
|
|
, _tid(std::move(tid))
|
|
, _snap_name(std::move(snap_name))
|
|
, _endpoint(std::move(endpoint))
|
|
, _bucket(std::move(bucket))
|
|
{
|
|
_status.progress_units = "batches";
|
|
}
|
|
|
|
virtual std::string type() const override {
|
|
return "restore_tablets";
|
|
}
|
|
|
|
virtual tasks::is_internal is_internal() const noexcept override {
|
|
return tasks::is_internal::no;
|
|
}
|
|
|
|
virtual tasks::is_user_task is_user_task() const noexcept override {
|
|
return tasks::is_user_task::yes;
|
|
}
|
|
|
|
tasks::is_abortable is_abortable() const noexcept override {
|
|
return tasks::is_abortable::no;
|
|
}
|
|
|
|
protected:
|
|
virtual future<> run() override {
|
|
auto& loader = _loader.local();
|
|
co_await loader._ss.local().restore_tablets(_tid, _snap_name, _endpoint, _bucket);
|
|
|
|
auto& db = loader._db.local();
|
|
auto s = db.find_schema(_tid);
|
|
const auto& topo = db.get_token_metadata().get_topology();
|
|
auto dc = topo.get_datacenter();
|
|
|
|
for (const auto& rack : topo.get_datacenter_racks().at(dc) | std::views::keys) {
|
|
auto result = co_await loader._sys_dist_ks.get_snapshot_sstables(_snap_name, s->ks_name(), s->cf_name(), dc, rack);
|
|
auto it = std::find_if(result.begin(), result.end(), [] (const auto& ent) { return !ent.downloaded; });
|
|
if (it != result.end()) {
|
|
llog.warn("Some replicas failed to download SSTables for {}:{} from {}", s->ks_name(), s->cf_name(), _snap_name);
|
|
throw std::runtime_error(format("Failed to download {}", it->toc_name));
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
future<tasks::task_id> sstables_loader::restore_tablets(table_id tid, sstring keyspace, sstring table, sstring snap_name, sstring endpoint, sstring bucket, utils::chunked_vector<sstring> manifests) {
|
|
co_await populate_snapshot_sstables_from_manifests(_storage_manager, _sys_dist_ks, keyspace, table, endpoint, bucket, snap_name, std::move(manifests));
|
|
auto task = co_await _task_manager_module->make_and_start_task<tablet_restore_task_impl>({}, container(), keyspace, tid, std::move(snap_name), std::move(endpoint), std::move(bucket));
|
|
co_return task->id();
|
|
}
|