diff --git a/CMakeLists.txt b/CMakeLists.txt index 107e38cf2f..1bb41a58c6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -299,6 +299,7 @@ target_sources(scylla-main serializer.cc service/direct_failure_detector/failure_detector.cc sstables_loader.cc + sstables_loader_helpers.cc table_helper.cc tasks/task_handler.cc tasks/task_manager.cc diff --git a/configure.py b/configure.py index 300b699a79..203a9439a5 100755 --- a/configure.py +++ b/configure.py @@ -1326,6 +1326,7 @@ scylla_core = (['message/messaging_service.cc', 'ent/ldap/ldap_connection.cc', 'reader_concurrency_semaphore.cc', 'sstables_loader.cc', + 'sstables_loader_helpers.cc', 'utils/utf8.cc', 'utils/ascii.cc', 'utils/like_matcher.cc', diff --git a/sstables_loader.cc b/sstables_loader.cc index 240c153075..9d241eedf0 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -31,6 +31,7 @@ #include "locator/abstract_replication_strategy.hh" #include "message/messaging_service.hh" #include "service/storage_service.hh" +#include "sstables_loader_helpers.hh" #include "sstables/object_storage_client.hh" #include "utils/rjson.hh" @@ -208,12 +209,6 @@ private: return result; } - struct minimal_sst_info { - shard_id _shard; - sstables::generation_type _generation; - sstables::sstable_version_types _version; - sstables::sstable_format_types _format; - }; using sst_classification_info = std::vector>; future<> attach_sstable(shard_id from_shard, const sstring& ks, const sstring& cf, const minimal_sst_info& min_info) const { @@ -258,100 +253,6 @@ private: } } - static future download_sstable(replica::database& db, replica::table& table, sstables::shared_sstable sstable, logging::logger& logger) { - constexpr auto foptions = file_open_options{.extent_allocation_size_hint = 32_MiB, .sloppy_size = true}; - constexpr auto stream_options = file_output_stream_options{.buffer_size = 128_KiB, .write_behind = 10}; - sst_classification_info downloaded_sstables(smp::count); - auto components = sstable->all_components(); - - // Move the TOC to the front to be processed first since `sstables::create_stream_sink` takes care - // of creating behind the scene TemporaryTOC instead of usual one. This assures that in case of failure - // this partially created SSTable will be cleaned up properly at some point. - auto toc_it = std::ranges::find_if(components, [](const auto& component) { return component.first == component_type::TOC; }); - if (toc_it != components.begin()) { - swap(*toc_it, components.front()); - } - - // Ensure the Scylla component is processed second. - // - // The sstable_sink->output() call for each component may invoke load_metadata() - // and save_metadata(), but these functions only operate correctly if the Scylla - // component file already exists on disk. If the Scylla component is written first, - // load_metadata()/save_metadata() become no-ops, leaving the original Scylla - // component (with outdated metadata) untouched. - // - // By placing the Scylla component second, we guarantee that: - // 1) The first component (TOC) is written and the Scylla component file already - // exists on disk when subsequent output() calls happen. - // 2) Later output() calls will overwrite the Scylla component with the correct, - // updated metadata. - // - // In short: Scylla must be written second so that all following output() calls - // can properly update its metadata instead of silently skipping it. - auto scylla_it = std::ranges::find_if(components, [](const auto& component) { return component.first == component_type::Scylla; }); - if (scylla_it != std::next(components.begin())) { - swap(*scylla_it, *std::next(components.begin())); - } - - auto gen = table.get_sstable_generation_generator()(); - auto files = co_await sstable->readable_file_for_all_components(); - for (auto it = components.cbegin(); it != components.cend(); ++it) { - try { - auto descriptor = sstable->get_descriptor(it->first); - auto sstable_sink = sstables::create_stream_sink( - table.schema(), - table.get_sstables_manager(), - table.get_storage_options(), - sstables::sstable_state::normal, - sstables::sstable::component_basename( - table.schema()->ks_name(), table.schema()->cf_name(), descriptor.version, gen, descriptor.format, it->first), - sstables::sstable_stream_sink_cfg{.last_component = std::next(it) == components.cend(), - .leave_unsealed = true}); - auto out = co_await sstable_sink->output(foptions, stream_options); - - input_stream src(co_await [ &it, sstable, f = files.at(it->first), &db, &table]() -> future> { - const auto fis_options = file_input_stream_options{.buffer_size = 128_KiB, .read_ahead = 2}; - - if (it->first != sstables::component_type::Data) { - co_return input_stream( - co_await sstable->get_storage().make_source(*sstable, it->first, f, 0, std::numeric_limits::max(), fis_options)); - } - auto permit = co_await db.obtain_reader_permit(table, "download_fully_contained_sstables", db::no_timeout, {}); - co_return co_await ( - sstable->get_compression() - ? sstable->data_stream(0, sstable->ondisk_data_size(), std::move(permit), nullptr, nullptr, sstables::sstable::raw_stream::yes) - : sstable->data_stream(0, sstable->data_size(), std::move(permit), nullptr, nullptr, sstables::sstable::raw_stream::no)); - }()); - - std::exception_ptr eptr; - try { - co_await seastar::copy(src, out); - } catch (...) { - eptr = std::current_exception(); - logger.info("Error downloading SSTable component {}. Reason: {}", it->first, eptr); - } - co_await src.close(); - co_await out.close(); - if (eptr) { - co_await sstable_sink->abort(); - std::rethrow_exception(eptr); - } - if (auto sst = co_await sstable_sink->close()) { - const auto& shards = sstable->get_shards_for_this_sstable(); - if (shards.size() != 1) { - on_internal_error(logger, "Fully-contained sstable must belong to one shard only"); - } - logger.debug("SSTable shards {}", fmt::join(shards, ", ")); - co_return minimal_sst_info{shards.front(), gen, descriptor.version, descriptor.format}; - } - } catch (...) { - logger.info("Error downloading SSTable component {}. Reason: {}", it->first, std::current_exception()); - throw; - } - } - throw std::logic_error("SSTable must have at least one component"); - } - future download_fully_contained_sstables(std::vector sstables) const { sst_classification_info downloaded_sstables(smp::count); for (const auto& sstable : sstables) { @@ -365,9 +266,6 @@ private: friend future> get_sstables_for_tablets_for_tests(const std::vector& sstables, std::vector&& tablets_ranges); - template - static future, std::vector>> - get_sstables_for_tablet(Range&& sstables, const dht::token_range& tablet_range); // Pay attention, while working with tablet ranges, the `erm` must be held alive as long as we retrieve (and use here) tablet ranges from // the tablet map. This is already done when using `tablet_sstable_streamer` class but tread carefully if you plan to use this method somewhere else. static future> get_sstables_for_tablets(const std::vector& sstables, @@ -510,34 +408,6 @@ public: } }; -template -future, std::vector>> -tablet_sstable_streamer::get_sstables_for_tablet(Range&& sstables, const dht::token_range& tablet_range) { - std::vector fully_contained; - std::vector partially_contained; - for (const auto& sst : sstables) { - auto sst_first = sst->get_first_decorated_key().token(); - auto sst_last = sst->get_last_decorated_key().token(); - - // SSTable entirely after tablet -> no further SSTables (larger keys) can overlap - if (tablet_range.after(sst_first, dht::token_comparator{})) { - break; - } - // SSTable entirely before tablet -> skip and continue scanning later (larger keys) - if (tablet_range.before(sst_last, dht::token_comparator{})) { - continue; - } - - if (tablet_range.contains(dht::token_range{sst_first, sst_last}, dht::token_comparator{})) { - fully_contained.push_back(sst); - } else { - partially_contained.push_back(sst); - } - co_await coroutine::maybe_yield(); - } - co_return std::make_tuple(std::move(fully_contained), std::move(partially_contained)); -} - future> tablet_sstable_streamer::get_sstables_for_tablets(const std::vector& sstables, std::vector&& tablets_ranges) { auto tablets_sstables = diff --git a/sstables_loader_helpers.cc b/sstables_loader_helpers.cc new file mode 100644 index 0000000000..72e4680b57 --- /dev/null +++ b/sstables_loader_helpers.cc @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2026-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 + */ + +#include "sstables_loader_helpers.hh" + +#include +#include +#include +#include "replica/database.hh" +#include "sstables/shared_sstable.hh" +#include "sstables/sstables.hh" + +future download_sstable(replica::database& db, replica::table& table, sstables::shared_sstable sstable, logging::logger& logger) { + constexpr auto foptions = file_open_options{.extent_allocation_size_hint = 32_MiB, .sloppy_size = true}; + constexpr auto stream_options = file_output_stream_options{.buffer_size = 128_KiB, .write_behind = 10}; + auto components = sstable->all_components(); + + // Move the TOC to the front to be processed first since `sstables::create_stream_sink` takes care + // of creating behind the scene TemporaryTOC instead of usual one. This assures that in case of failure + // this partially created SSTable will be cleaned up properly at some point. + auto toc_it = std::ranges::find_if(components, [](const auto& component) { return component.first == component_type::TOC; }); + if (toc_it != components.begin()) { + swap(*toc_it, components.front()); + } + + // Ensure the Scylla component is processed second. + // + // The sstable_sink->output() call for each component may invoke load_metadata() + // and save_metadata(), but these functions only operate correctly if the Scylla + // component file already exists on disk. If the Scylla component is written first, + // load_metadata()/save_metadata() become no-ops, leaving the original Scylla + // component (with outdated metadata) untouched. + // + // By placing the Scylla component second, we guarantee that: + // 1) The first component (TOC) is written and the Scylla component file already + // exists on disk when subsequent output() calls happen. + // 2) Later output() calls will overwrite the Scylla component with the correct, + // updated metadata. + // + // In short: Scylla must be written second so that all following output() calls + // can properly update its metadata instead of silently skipping it. + auto scylla_it = std::ranges::find_if(components, [](const auto& component) { return component.first == component_type::Scylla; }); + if (scylla_it != std::next(components.begin())) { + swap(*scylla_it, *std::next(components.begin())); + } + + auto gen = table.get_sstable_generation_generator()(); + auto files = co_await sstable->readable_file_for_all_components(); + for (auto it = components.cbegin(); it != components.cend(); ++it) { + try { + auto descriptor = sstable->get_descriptor(it->first); + auto sstable_sink = + sstables::create_stream_sink(table.schema(), + table.get_sstables_manager(), + table.get_storage_options(), + sstables::sstable_state::normal, + sstables::sstable::component_basename( + table.schema()->ks_name(), table.schema()->cf_name(), descriptor.version, gen, descriptor.format, it->first), + sstables::sstable_stream_sink_cfg{.last_component = std::next(it) == components.cend(), .leave_unsealed = true}); + auto out = co_await sstable_sink->output(foptions, stream_options); + + input_stream src(co_await [&it, sstable, f = files.at(it->first), &db, &table]() -> future> { + const auto fis_options = file_input_stream_options{.buffer_size = 128_KiB, .read_ahead = 2}; + + if (it->first != sstables::component_type::Data) { + co_return input_stream( + co_await sstable->get_storage().make_source(*sstable, it->first, f, 0, std::numeric_limits::max(), fis_options)); + } + auto permit = co_await db.obtain_reader_permit(table, "download_fully_contained_sstables", db::no_timeout, {}); + co_return co_await ( + sstable->get_compression() + ? sstable->data_stream(0, sstable->ondisk_data_size(), std::move(permit), nullptr, nullptr, sstables::sstable::raw_stream::yes) + : sstable->data_stream(0, sstable->data_size(), std::move(permit), nullptr, nullptr, sstables::sstable::raw_stream::no)); + }()); + + std::exception_ptr eptr; + try { + co_await seastar::copy(src, out); + } catch (...) { + eptr = std::current_exception(); + logger.info("Error downloading SSTable component {}. Reason: {}", it->first, eptr); + } + co_await src.close(); + co_await out.close(); + if (eptr) { + co_await sstable_sink->abort(); + std::rethrow_exception(eptr); + } + if (auto sst = co_await sstable_sink->close()) { + const auto& shards = sstable->get_shards_for_this_sstable(); + if (shards.size() != 1) { + on_internal_error(logger, "Fully-contained sstable must belong to one shard only"); + } + logger.debug("SSTable shards {}", fmt::join(shards, ", ")); + co_return minimal_sst_info{shards.front(), gen, descriptor.version, descriptor.format}; + } + } catch (...) { + logger.info("Error downloading SSTable component {}. Reason: {}", it->first, std::current_exception()); + throw; + } + } + throw std::logic_error("SSTable must have at least one component"); +} diff --git a/sstables_loader_helpers.hh b/sstables_loader_helpers.hh new file mode 100644 index 0000000000..0332b6b7b9 --- /dev/null +++ b/sstables_loader_helpers.hh @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2026-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 + */ + +#pragma once + +#include +#include +#include "dht/i_partitioner_fwd.hh" +#include "dht/token.hh" +#include "sstables/generation_type.hh" +#include "sstables/shared_sstable.hh" +#include "sstables/version.hh" +#include "utils/log.hh" + +#include + +namespace replica { +class table; +class database; +} // namespace replica + +struct minimal_sst_info { + shard_id _shard; + sstables::generation_type _generation; + sstables::sstable_version_types _version; + sstables::sstable_format_types _format; +}; + +future download_sstable(replica::database& db, replica::table& table, sstables::shared_sstable sstable, logging::logger& logger); + +template +future, std::vector>> get_sstables_for_tablet(Range&& sstables, + const dht::token_range& tablet_range) { + std::vector fully_contained; + std::vector partially_contained; + for (const auto& sst : sstables) { + auto sst_first = sst->get_first_decorated_key().token(); + auto sst_last = sst->get_last_decorated_key().token(); + + // SSTable entirely after tablet -> no further SSTables (larger keys) can overlap + if (tablet_range.after(sst_first, dht::token_comparator{})) { + break; + } + // SSTable entirely before tablet -> skip and continue scanning later (larger keys) + if (tablet_range.before(sst_last, dht::token_comparator{})) { + continue; + } + + if (tablet_range.contains(dht::token_range{sst_first, sst_last}, dht::token_comparator{})) { + fully_contained.push_back(sst); + } else { + partially_contained.push_back(sst); + } + co_await coroutine::maybe_yield(); + } + co_return std::make_tuple(std::move(fully_contained), std::move(partially_contained)); +}