mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-21 00:50:35 +00:00
sstables_loader: move download_sstable and get_sstables_for_tablet
Move the download_sstable and get_sstables_for_tablet static functions from sstables_loader into a new file to make them reusable by the tablet-aware restore code.
This commit is contained in:
committed by
Pavel Emelyanov
parent
5422adc6f6
commit
4d628dbcc4
@@ -299,6 +299,7 @@ target_sources(scylla-main
|
||||
serializer.cc
|
||||
service/direct_failure_detector/failure_detector.cc
|
||||
sstables_loader.cc
|
||||
sstables_loader_helpers.cc
|
||||
table_helper.cc
|
||||
tasks/task_handler.cc
|
||||
tasks/task_manager.cc
|
||||
|
||||
@@ -1326,6 +1326,7 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'ent/ldap/ldap_connection.cc',
|
||||
'reader_concurrency_semaphore.cc',
|
||||
'sstables_loader.cc',
|
||||
'sstables_loader_helpers.cc',
|
||||
'utils/utf8.cc',
|
||||
'utils/ascii.cc',
|
||||
'utils/like_matcher.cc',
|
||||
|
||||
@@ -31,6 +31,7 @@
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "sstables_loader_helpers.hh"
|
||||
|
||||
#include "sstables/object_storage_client.hh"
|
||||
#include "utils/rjson.hh"
|
||||
@@ -208,12 +209,6 @@ private:
|
||||
return result;
|
||||
}
|
||||
|
||||
struct minimal_sst_info {
|
||||
shard_id _shard;
|
||||
sstables::generation_type _generation;
|
||||
sstables::sstable_version_types _version;
|
||||
sstables::sstable_format_types _format;
|
||||
};
|
||||
using sst_classification_info = std::vector<std::vector<minimal_sst_info>>;
|
||||
|
||||
future<> attach_sstable(shard_id from_shard, const sstring& ks, const sstring& cf, const minimal_sst_info& min_info) const {
|
||||
@@ -258,100 +253,6 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
static future<minimal_sst_info> download_sstable(replica::database& db, replica::table& table, sstables::shared_sstable sstable, logging::logger& logger) {
|
||||
constexpr auto foptions = file_open_options{.extent_allocation_size_hint = 32_MiB, .sloppy_size = true};
|
||||
constexpr auto stream_options = file_output_stream_options{.buffer_size = 128_KiB, .write_behind = 10};
|
||||
sst_classification_info downloaded_sstables(smp::count);
|
||||
auto components = sstable->all_components();
|
||||
|
||||
// Move the TOC to the front to be processed first since `sstables::create_stream_sink` takes care
|
||||
// of creating behind the scene TemporaryTOC instead of usual one. This assures that in case of failure
|
||||
// this partially created SSTable will be cleaned up properly at some point.
|
||||
auto toc_it = std::ranges::find_if(components, [](const auto& component) { return component.first == component_type::TOC; });
|
||||
if (toc_it != components.begin()) {
|
||||
swap(*toc_it, components.front());
|
||||
}
|
||||
|
||||
// Ensure the Scylla component is processed second.
|
||||
//
|
||||
// The sstable_sink->output() call for each component may invoke load_metadata()
|
||||
// and save_metadata(), but these functions only operate correctly if the Scylla
|
||||
// component file already exists on disk. If the Scylla component is written first,
|
||||
// load_metadata()/save_metadata() become no-ops, leaving the original Scylla
|
||||
// component (with outdated metadata) untouched.
|
||||
//
|
||||
// By placing the Scylla component second, we guarantee that:
|
||||
// 1) The first component (TOC) is written and the Scylla component file already
|
||||
// exists on disk when subsequent output() calls happen.
|
||||
// 2) Later output() calls will overwrite the Scylla component with the correct,
|
||||
// updated metadata.
|
||||
//
|
||||
// In short: Scylla must be written second so that all following output() calls
|
||||
// can properly update its metadata instead of silently skipping it.
|
||||
auto scylla_it = std::ranges::find_if(components, [](const auto& component) { return component.first == component_type::Scylla; });
|
||||
if (scylla_it != std::next(components.begin())) {
|
||||
swap(*scylla_it, *std::next(components.begin()));
|
||||
}
|
||||
|
||||
auto gen = table.get_sstable_generation_generator()();
|
||||
auto files = co_await sstable->readable_file_for_all_components();
|
||||
for (auto it = components.cbegin(); it != components.cend(); ++it) {
|
||||
try {
|
||||
auto descriptor = sstable->get_descriptor(it->first);
|
||||
auto sstable_sink = sstables::create_stream_sink(
|
||||
table.schema(),
|
||||
table.get_sstables_manager(),
|
||||
table.get_storage_options(),
|
||||
sstables::sstable_state::normal,
|
||||
sstables::sstable::component_basename(
|
||||
table.schema()->ks_name(), table.schema()->cf_name(), descriptor.version, gen, descriptor.format, it->first),
|
||||
sstables::sstable_stream_sink_cfg{.last_component = std::next(it) == components.cend(),
|
||||
.leave_unsealed = true});
|
||||
auto out = co_await sstable_sink->output(foptions, stream_options);
|
||||
|
||||
input_stream src(co_await [ &it, sstable, f = files.at(it->first), &db, &table]() -> future<input_stream<char>> {
|
||||
const auto fis_options = file_input_stream_options{.buffer_size = 128_KiB, .read_ahead = 2};
|
||||
|
||||
if (it->first != sstables::component_type::Data) {
|
||||
co_return input_stream<char>(
|
||||
co_await sstable->get_storage().make_source(*sstable, it->first, f, 0, std::numeric_limits<size_t>::max(), fis_options));
|
||||
}
|
||||
auto permit = co_await db.obtain_reader_permit(table, "download_fully_contained_sstables", db::no_timeout, {});
|
||||
co_return co_await (
|
||||
sstable->get_compression()
|
||||
? sstable->data_stream(0, sstable->ondisk_data_size(), std::move(permit), nullptr, nullptr, sstables::sstable::raw_stream::yes)
|
||||
: sstable->data_stream(0, sstable->data_size(), std::move(permit), nullptr, nullptr, sstables::sstable::raw_stream::no));
|
||||
}());
|
||||
|
||||
std::exception_ptr eptr;
|
||||
try {
|
||||
co_await seastar::copy(src, out);
|
||||
} catch (...) {
|
||||
eptr = std::current_exception();
|
||||
logger.info("Error downloading SSTable component {}. Reason: {}", it->first, eptr);
|
||||
}
|
||||
co_await src.close();
|
||||
co_await out.close();
|
||||
if (eptr) {
|
||||
co_await sstable_sink->abort();
|
||||
std::rethrow_exception(eptr);
|
||||
}
|
||||
if (auto sst = co_await sstable_sink->close()) {
|
||||
const auto& shards = sstable->get_shards_for_this_sstable();
|
||||
if (shards.size() != 1) {
|
||||
on_internal_error(logger, "Fully-contained sstable must belong to one shard only");
|
||||
}
|
||||
logger.debug("SSTable shards {}", fmt::join(shards, ", "));
|
||||
co_return minimal_sst_info{shards.front(), gen, descriptor.version, descriptor.format};
|
||||
}
|
||||
} catch (...) {
|
||||
logger.info("Error downloading SSTable component {}. Reason: {}", it->first, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
throw std::logic_error("SSTable must have at least one component");
|
||||
}
|
||||
|
||||
future<sst_classification_info> download_fully_contained_sstables(std::vector<sstables::shared_sstable> sstables) const {
|
||||
sst_classification_info downloaded_sstables(smp::count);
|
||||
for (const auto& sstable : sstables) {
|
||||
@@ -365,9 +266,6 @@ private:
|
||||
|
||||
friend future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
|
||||
std::vector<dht::token_range>&& tablets_ranges);
|
||||
template <std::ranges::input_range Range>
|
||||
static future<std::tuple<std::vector<sstables::shared_sstable>, std::vector<sstables::shared_sstable>>>
|
||||
get_sstables_for_tablet(Range&& sstables, const dht::token_range& tablet_range);
|
||||
// Pay attention, while working with tablet ranges, the `erm` must be held alive as long as we retrieve (and use here) tablet ranges from
|
||||
// the tablet map. This is already done when using `tablet_sstable_streamer` class but tread carefully if you plan to use this method somewhere else.
|
||||
static future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets(const std::vector<sstables::shared_sstable>& sstables,
|
||||
@@ -510,34 +408,6 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
template <std::ranges::input_range Range>
|
||||
future<std::tuple<std::vector<sstables::shared_sstable>, std::vector<sstables::shared_sstable>>>
|
||||
tablet_sstable_streamer::get_sstables_for_tablet(Range&& sstables, const dht::token_range& tablet_range) {
|
||||
std::vector<sstables::shared_sstable> fully_contained;
|
||||
std::vector<sstables::shared_sstable> partially_contained;
|
||||
for (const auto& sst : sstables) {
|
||||
auto sst_first = sst->get_first_decorated_key().token();
|
||||
auto sst_last = sst->get_last_decorated_key().token();
|
||||
|
||||
// SSTable entirely after tablet -> no further SSTables (larger keys) can overlap
|
||||
if (tablet_range.after(sst_first, dht::token_comparator{})) {
|
||||
break;
|
||||
}
|
||||
// SSTable entirely before tablet -> skip and continue scanning later (larger keys)
|
||||
if (tablet_range.before(sst_last, dht::token_comparator{})) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tablet_range.contains(dht::token_range{sst_first, sst_last}, dht::token_comparator{})) {
|
||||
fully_contained.push_back(sst);
|
||||
} else {
|
||||
partially_contained.push_back(sst);
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
co_return std::make_tuple(std::move(fully_contained), std::move(partially_contained));
|
||||
}
|
||||
|
||||
future<std::vector<tablet_sstable_collection>> tablet_sstable_streamer::get_sstables_for_tablets(const std::vector<sstables::shared_sstable>& sstables,
|
||||
std::vector<dht::token_range>&& tablets_ranges) {
|
||||
auto tablets_sstables =
|
||||
|
||||
108
sstables_loader_helpers.cc
Normal file
108
sstables_loader_helpers.cc
Normal file
@@ -0,0 +1,108 @@
|
||||
/*
|
||||
* Copyright (C) 2026-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
|
||||
#include "sstables_loader_helpers.hh"
|
||||
|
||||
#include <seastar/core/file.hh>
|
||||
#include <seastar/core/units.hh>
|
||||
#include <seastar/core/fstream.hh>
|
||||
#include "replica/database.hh"
|
||||
#include "sstables/shared_sstable.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
|
||||
future<minimal_sst_info> download_sstable(replica::database& db, replica::table& table, sstables::shared_sstable sstable, logging::logger& logger) {
|
||||
constexpr auto foptions = file_open_options{.extent_allocation_size_hint = 32_MiB, .sloppy_size = true};
|
||||
constexpr auto stream_options = file_output_stream_options{.buffer_size = 128_KiB, .write_behind = 10};
|
||||
auto components = sstable->all_components();
|
||||
|
||||
// Move the TOC to the front to be processed first since `sstables::create_stream_sink` takes care
|
||||
// of creating behind the scene TemporaryTOC instead of usual one. This assures that in case of failure
|
||||
// this partially created SSTable will be cleaned up properly at some point.
|
||||
auto toc_it = std::ranges::find_if(components, [](const auto& component) { return component.first == component_type::TOC; });
|
||||
if (toc_it != components.begin()) {
|
||||
swap(*toc_it, components.front());
|
||||
}
|
||||
|
||||
// Ensure the Scylla component is processed second.
|
||||
//
|
||||
// The sstable_sink->output() call for each component may invoke load_metadata()
|
||||
// and save_metadata(), but these functions only operate correctly if the Scylla
|
||||
// component file already exists on disk. If the Scylla component is written first,
|
||||
// load_metadata()/save_metadata() become no-ops, leaving the original Scylla
|
||||
// component (with outdated metadata) untouched.
|
||||
//
|
||||
// By placing the Scylla component second, we guarantee that:
|
||||
// 1) The first component (TOC) is written and the Scylla component file already
|
||||
// exists on disk when subsequent output() calls happen.
|
||||
// 2) Later output() calls will overwrite the Scylla component with the correct,
|
||||
// updated metadata.
|
||||
//
|
||||
// In short: Scylla must be written second so that all following output() calls
|
||||
// can properly update its metadata instead of silently skipping it.
|
||||
auto scylla_it = std::ranges::find_if(components, [](const auto& component) { return component.first == component_type::Scylla; });
|
||||
if (scylla_it != std::next(components.begin())) {
|
||||
swap(*scylla_it, *std::next(components.begin()));
|
||||
}
|
||||
|
||||
auto gen = table.get_sstable_generation_generator()();
|
||||
auto files = co_await sstable->readable_file_for_all_components();
|
||||
for (auto it = components.cbegin(); it != components.cend(); ++it) {
|
||||
try {
|
||||
auto descriptor = sstable->get_descriptor(it->first);
|
||||
auto sstable_sink =
|
||||
sstables::create_stream_sink(table.schema(),
|
||||
table.get_sstables_manager(),
|
||||
table.get_storage_options(),
|
||||
sstables::sstable_state::normal,
|
||||
sstables::sstable::component_basename(
|
||||
table.schema()->ks_name(), table.schema()->cf_name(), descriptor.version, gen, descriptor.format, it->first),
|
||||
sstables::sstable_stream_sink_cfg{.last_component = std::next(it) == components.cend(), .leave_unsealed = true});
|
||||
auto out = co_await sstable_sink->output(foptions, stream_options);
|
||||
|
||||
input_stream src(co_await [&it, sstable, f = files.at(it->first), &db, &table]() -> future<input_stream<char>> {
|
||||
const auto fis_options = file_input_stream_options{.buffer_size = 128_KiB, .read_ahead = 2};
|
||||
|
||||
if (it->first != sstables::component_type::Data) {
|
||||
co_return input_stream<char>(
|
||||
co_await sstable->get_storage().make_source(*sstable, it->first, f, 0, std::numeric_limits<size_t>::max(), fis_options));
|
||||
}
|
||||
auto permit = co_await db.obtain_reader_permit(table, "download_fully_contained_sstables", db::no_timeout, {});
|
||||
co_return co_await (
|
||||
sstable->get_compression()
|
||||
? sstable->data_stream(0, sstable->ondisk_data_size(), std::move(permit), nullptr, nullptr, sstables::sstable::raw_stream::yes)
|
||||
: sstable->data_stream(0, sstable->data_size(), std::move(permit), nullptr, nullptr, sstables::sstable::raw_stream::no));
|
||||
}());
|
||||
|
||||
std::exception_ptr eptr;
|
||||
try {
|
||||
co_await seastar::copy(src, out);
|
||||
} catch (...) {
|
||||
eptr = std::current_exception();
|
||||
logger.info("Error downloading SSTable component {}. Reason: {}", it->first, eptr);
|
||||
}
|
||||
co_await src.close();
|
||||
co_await out.close();
|
||||
if (eptr) {
|
||||
co_await sstable_sink->abort();
|
||||
std::rethrow_exception(eptr);
|
||||
}
|
||||
if (auto sst = co_await sstable_sink->close()) {
|
||||
const auto& shards = sstable->get_shards_for_this_sstable();
|
||||
if (shards.size() != 1) {
|
||||
on_internal_error(logger, "Fully-contained sstable must belong to one shard only");
|
||||
}
|
||||
logger.debug("SSTable shards {}", fmt::join(shards, ", "));
|
||||
co_return minimal_sst_info{shards.front(), gen, descriptor.version, descriptor.format};
|
||||
}
|
||||
} catch (...) {
|
||||
logger.info("Error downloading SSTable component {}. Reason: {}", it->first, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
throw std::logic_error("SSTable must have at least one component");
|
||||
}
|
||||
62
sstables_loader_helpers.hh
Normal file
62
sstables_loader_helpers.hh
Normal file
@@ -0,0 +1,62 @@
|
||||
/*
|
||||
* Copyright (C) 2026-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include "dht/i_partitioner_fwd.hh"
|
||||
#include "dht/token.hh"
|
||||
#include "sstables/generation_type.hh"
|
||||
#include "sstables/shared_sstable.hh"
|
||||
#include "sstables/version.hh"
|
||||
#include "utils/log.hh"
|
||||
|
||||
#include <vector>
|
||||
|
||||
namespace replica {
|
||||
class table;
|
||||
class database;
|
||||
} // namespace replica
|
||||
|
||||
struct minimal_sst_info {
|
||||
shard_id _shard;
|
||||
sstables::generation_type _generation;
|
||||
sstables::sstable_version_types _version;
|
||||
sstables::sstable_format_types _format;
|
||||
};
|
||||
|
||||
future<minimal_sst_info> download_sstable(replica::database& db, replica::table& table, sstables::shared_sstable sstable, logging::logger& logger);
|
||||
|
||||
template <std::ranges::input_range Range>
|
||||
future<std::tuple<std::vector<sstables::shared_sstable>, std::vector<sstables::shared_sstable>>> get_sstables_for_tablet(Range&& sstables,
|
||||
const dht::token_range& tablet_range) {
|
||||
std::vector<sstables::shared_sstable> fully_contained;
|
||||
std::vector<sstables::shared_sstable> partially_contained;
|
||||
for (const auto& sst : sstables) {
|
||||
auto sst_first = sst->get_first_decorated_key().token();
|
||||
auto sst_last = sst->get_last_decorated_key().token();
|
||||
|
||||
// SSTable entirely after tablet -> no further SSTables (larger keys) can overlap
|
||||
if (tablet_range.after(sst_first, dht::token_comparator{})) {
|
||||
break;
|
||||
}
|
||||
// SSTable entirely before tablet -> skip and continue scanning later (larger keys)
|
||||
if (tablet_range.before(sst_last, dht::token_comparator{})) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tablet_range.contains(dht::token_range{sst_first, sst_last}, dht::token_comparator{})) {
|
||||
fully_contained.push_back(sst);
|
||||
} else {
|
||||
partially_contained.push_back(sst);
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
co_return std::make_tuple(std::move(fully_contained), std::move(partially_contained));
|
||||
}
|
||||
Reference in New Issue
Block a user