diff --git a/compaction/compaction.cc b/compaction/compaction.cc index bac75f9441..e32e1f5b9a 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -424,11 +424,7 @@ public: } } formatted_sstables_list& operator+=(const shared_sstable& sst) { - if (_include_origin) { - _ssts.emplace_back(format("{}:level={:d}:origin={}", sst->get_filename(), sst->get_sstable_level(), sst->get_origin())); - } else { - _ssts.emplace_back(format("{}:level={:d}", sst->get_filename(), sst->get_sstable_level())); - } + _ssts.emplace_back(to_string(sst, _include_origin)); return *this; } friend std::ostream& operator<<(std::ostream& os, const formatted_sstables_list& lst); @@ -475,6 +471,9 @@ protected: // used to incrementally calculate max purgeable timestamp, as we iterate through decorated keys. std::optional _selector; std::unordered_set _compacting_for_max_purgeable_func; + // optional owned_ranges vector for cleanup; + owned_ranges_ptr _owned_ranges = {}; + std::optional _owned_ranges_checker; // Garbage collected sstables that are sealed but were not added to SSTable set yet. std::vector _unused_garbage_collected_sstables; // Garbage collected sstables that were added to SSTable set and should be eventually removed from it. @@ -503,6 +502,8 @@ protected: , _sstable_set(std::move(descriptor.all_sstables_snapshot)) , _selector(_sstable_set ? _sstable_set->make_incremental_selector() : std::optional{}) , _compacting_for_max_purgeable_func(std::unordered_set(_sstables.begin(), _sstables.end())) + , _owned_ranges(std::move(descriptor.owned_ranges)) + , _owned_ranges_checker(_owned_ranges ? std::optional(*_owned_ranges) : std::nullopt) { for (auto& sst : _sstables) { _stats_collector.update(sst->get_encoding_stats_for_compaction()); @@ -621,6 +622,21 @@ protected: bool enable_garbage_collected_sstable_writer() const noexcept { return _contains_multi_fragment_runs && _max_sstable_size != std::numeric_limits::max(); } + + flat_mutation_reader_v2::filter make_partition_filter() const { + return [this] (const dht::decorated_key& dk) { +#ifdef SEASTAR_DEBUG + // sstables should never be shared with other shards at this point. + assert(dht::shard_of(*_schema, dk.token()) == this_shard_id()); +#endif + + if (!_owned_ranges_checker->belongs_to_current_node(dk.token())) { + log_trace("Token {} does not belong to this node, skipping", dk.token()); + return false; + } + return true; + }; + } public: compaction& operator=(const compaction&) = delete; compaction(const compaction&) = delete; @@ -634,6 +650,17 @@ private: // Default range sstable reader that will only return mutation that belongs to current shard. virtual flat_mutation_reader_v2 make_sstable_reader() const = 0; + // Make a filtering reader if needed + // FIXME: the sstable reader itself should be pass the owned ranges + // so it can skip over the disowned ranges efficiently using the index. + // Ref https://github.com/scylladb/scylladb/issues/12998 + flat_mutation_reader_v2 setup_sstable_reader() const { + if (!_owned_ranges_checker) { + return make_sstable_reader(); + } + return make_filtering_reader(make_sstable_reader(), make_partition_filter()); + } + virtual sstables::sstable_set make_sstable_set_for_input() const { return _table_s.get_compaction_strategy().make_sstable_set(_schema); } @@ -703,7 +730,7 @@ private: }); }); const auto& gc_state = _table_s.get_tombstone_gc_state(); - return consumer(make_compacting_reader(make_sstable_reader(), compaction_time, max_purgeable_func(), gc_state)); + return consumer(make_compacting_reader(setup_sstable_reader(), compaction_time, max_purgeable_func(), gc_state)); } future<> consume() { @@ -739,7 +766,7 @@ private: reader.consume_in_thread(std::move(cfc)); }); }); - return consumer(make_sstable_reader()); + return consumer(setup_sstable_reader()); } virtual reader_consumer_v2 make_interposer_consumer(reader_consumer_v2 end_consumer) { @@ -1175,8 +1202,6 @@ private: }; class cleanup_compaction final : public regular_compaction { - owned_ranges_ptr _owned_ranges; - dht::incremental_owned_ranges_checker _owned_ranges_checker; private: // Called in a seastar thread dht::partition_range_vector @@ -1199,22 +1224,10 @@ protected: return compaction_completion_desc{std::move(input_sstables), std::move(output_sstables), std::move(ranges_for_for_invalidation)}; } -private: - cleanup_compaction(table_state& table_s, compaction_descriptor descriptor, compaction_data& cdata, owned_ranges_ptr owned_ranges) - : regular_compaction(table_s, std::move(descriptor), cdata) - , _owned_ranges(std::move(owned_ranges)) - , _owned_ranges_checker(*_owned_ranges) - { - } - public: - cleanup_compaction(table_state& table_s, compaction_descriptor descriptor, compaction_data& cdata, compaction_type_options::cleanup opts) - : cleanup_compaction(table_s, std::move(descriptor), cdata, std::move(opts.owned_ranges)) {} - cleanup_compaction(table_state& table_s, compaction_descriptor descriptor, compaction_data& cdata, compaction_type_options::upgrade opts) - : cleanup_compaction(table_s, std::move(descriptor), cdata, std::move(opts.owned_ranges)) {} - - flat_mutation_reader_v2 make_sstable_reader() const override { - return make_filtering_reader(regular_compaction::make_sstable_reader(), make_partition_filter()); + cleanup_compaction(table_state& table_s, compaction_descriptor descriptor, compaction_data& cdata) + : regular_compaction(table_s, std::move(descriptor), cdata) + { } std::string_view report_start_desc() const override { @@ -1224,21 +1237,6 @@ public: std::string_view report_finish_desc() const override { return "Cleaned"; } - - flat_mutation_reader_v2::filter make_partition_filter() const { - return [this] (const dht::decorated_key& dk) { -#ifdef SEASTAR_DEBUG - // sstables should never be shared with other shards at this point. - assert(dht::shard_of(*_schema, dk.token()) == this_shard_id()); -#endif - - if (!_owned_ranges_checker.belongs_to_current_node(dk.token())) { - log_trace("Token {} does not belong to this node, skipping", dk.token()); - return false; - } - return true; - }; - } }; class scrub_compaction final : public regular_compaction { @@ -1710,11 +1708,11 @@ static std::unique_ptr make_compaction(table_state& table_s, sstable std::unique_ptr operator()(compaction_type_options::regular) { return std::make_unique(table_s, std::move(descriptor), cdata); } - std::unique_ptr operator()(compaction_type_options::cleanup options) { - return std::make_unique(table_s, std::move(descriptor), cdata, std::move(options)); + std::unique_ptr operator()(compaction_type_options::cleanup) { + return std::make_unique(table_s, std::move(descriptor), cdata); } - std::unique_ptr operator()(compaction_type_options::upgrade options) { - return std::make_unique(table_s, std::move(descriptor), cdata, std::move(options)); + std::unique_ptr operator()(compaction_type_options::upgrade) { + return std::make_unique(table_s, std::move(descriptor), cdata); } std::unique_ptr operator()(compaction_type_options::scrub scrub_options) { return std::make_unique(table_s, std::move(descriptor), cdata, scrub_options); diff --git a/compaction/compaction_descriptor.hh b/compaction/compaction_descriptor.hh index 28727a6102..449b581ce7 100644 --- a/compaction/compaction_descriptor.hh +++ b/compaction/compaction_descriptor.hh @@ -19,16 +19,7 @@ #include "utils/UUID.hh" #include "dht/i_partitioner.hh" #include "compaction_weight_registration.hh" - -namespace compaction { - -using owned_ranges_ptr = lw_shared_ptr; - -inline owned_ranges_ptr make_owned_ranges_ptr(dht::token_range_vector&& ranges) { - return make_lw_shared(std::move(ranges)); -} - -} // namespace compaction +#include "compaction_fwd.hh" namespace sstables { @@ -64,10 +55,8 @@ public: struct regular { }; struct cleanup { - compaction::owned_ranges_ptr owned_ranges; }; struct upgrade { - compaction::owned_ranges_ptr owned_ranges; }; struct scrub { enum class mode { @@ -112,12 +101,12 @@ public: return compaction_type_options(regular{}); } - static compaction_type_options make_cleanup(compaction::owned_ranges_ptr owned_ranges) { - return compaction_type_options(cleanup{std::move(owned_ranges)}); + static compaction_type_options make_cleanup() { + return compaction_type_options(cleanup{}); } - static compaction_type_options make_upgrade(compaction::owned_ranges_ptr owned_ranges) { - return compaction_type_options(upgrade{std::move(owned_ranges)}); + static compaction_type_options make_upgrade() { + return compaction_type_options(upgrade{}); } static compaction_type_options make_scrub(scrub::mode mode) { @@ -160,6 +149,8 @@ struct compaction_descriptor { // The options passed down to the compaction code. // This also selects the kind of compaction to do. compaction_type_options options = compaction_type_options::make_regular(); + // If engaged, compaction will cleanup the input sstables by skipping non-owned ranges. + compaction::owned_ranges_ptr owned_ranges; compaction_sstable_creator_fn creator; compaction_sstable_replacer_fn replacer; @@ -179,12 +170,14 @@ struct compaction_descriptor { int level = default_level, uint64_t max_sstable_bytes = default_max_sstable_bytes, run_id run_identifier = run_id::create_random_id(), - compaction_type_options options = compaction_type_options::make_regular()) + compaction_type_options options = compaction_type_options::make_regular(), + compaction::owned_ranges_ptr owned_ranges_ = {}) : sstables(std::move(sstables)) , level(level) , max_sstable_bytes(max_sstable_bytes) , run_identifier(run_identifier) , options(options) + , owned_ranges(std::move(owned_ranges_)) , io_priority(io_priority) {} diff --git a/compaction/compaction_fwd.hh b/compaction/compaction_fwd.hh new file mode 100644 index 0000000000..bd8b118134 --- /dev/null +++ b/compaction/compaction_fwd.hh @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2023-present ScyllaDB + * + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include "dht/i_partitioner.hh" + +namespace compaction { + +class table_state; +class strategy_control; +struct compaction_state; + +using owned_ranges_ptr = lw_shared_ptr; + +inline owned_ranges_ptr make_owned_ranges_ptr(dht::token_range_vector&& ranges) { + return make_lw_shared(std::move(ranges)); +} + +} // namespace compaction diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index c6c9116491..8c49854cbf 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -32,14 +32,16 @@ using namespace compaction; class compacting_sstable_registration { compaction_manager& _cm; + compaction::compaction_state& _cs; std::unordered_set _compacting; public: - explicit compacting_sstable_registration(compaction_manager& cm) noexcept + explicit compacting_sstable_registration(compaction_manager& cm, compaction::compaction_state& cs) noexcept : _cm(cm) + , _cs(cs) { } - compacting_sstable_registration(compaction_manager& cm, std::vector compacting) - : compacting_sstable_registration(cm) + compacting_sstable_registration(compaction_manager& cm, compaction::compaction_state& cs, std::vector compacting) + : compacting_sstable_registration(cm, cs) { register_compacting(compacting); } @@ -57,6 +59,7 @@ public: compacting_sstable_registration(compacting_sstable_registration&& other) noexcept : _cm(other._cm) + , _cs(other._cs) , _compacting(std::move(other._compacting)) { } @@ -77,6 +80,10 @@ public: _cm.deregister_compacting_sstables(sstables.begin(), sstables.end()); for (const auto& sst : sstables) { _compacting.erase(sst); + _cs.sstables_requiring_cleanup.erase(sst); + } + if (_cs.sstables_requiring_cleanup.empty()) { + _cs.owned_ranges_ptr = nullptr; } } }; @@ -205,9 +212,15 @@ std::vector in_strategy_sstables(table_state& table_s) })); } -std::vector compaction_manager::get_candidates(table_state& t) { +std::vector compaction_manager::get_candidates(table_state& t) const { + return get_candidates(t, *t.main_sstable_set().all()); +} + +template +requires std::convertible_to, sstables::shared_sstable> +std::vector compaction_manager::get_candidates(table_state& t, const Range& sstables) const { std::vector candidates; - candidates.reserve(t.main_sstable_set().size()); + candidates.reserve(sstables.size()); // prevents sstables that belongs to a partial run being generated by ongoing compaction from being // selected for compaction, which could potentially result in wrong behavior. auto partial_run_identifiers = boost::copy_range>(_tasks @@ -215,7 +228,10 @@ std::vector compaction_manager::get_candidates(table_s | boost::adaptors::transformed(std::mem_fn(&compaction_task_executor::output_run_id))); // Filter out sstables that are being compacted. - for (auto& sst : in_strategy_sstables(t)) { + for (const auto& sst : sstables) { + if (!sstables::is_eligible_for_compaction(sst)) { + continue; + } if (_compacting_sstables.contains(sst)) { continue; } @@ -261,7 +277,7 @@ private: virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override {} }; -compaction_manager::compaction_state& compaction_manager::get_compaction_state(table_state* t) { +compaction::compaction_state& compaction_manager::get_compaction_state(table_state* t) { try { return _compaction_state.at(t); } catch (std::out_of_range&) { @@ -345,6 +361,24 @@ future compaction_task_executor::compact_sstables(s } }; + // retrieve owned_ranges if_required + if (!descriptor.owned_ranges) { + std::vector sstables_requiring_cleanup; + const auto& cs = _cm.get_compaction_state(_compacting_table); + for (const auto& sst : descriptor.sstables) { + if (cs.sstables_requiring_cleanup.contains(sst)) { + sstables_requiring_cleanup.emplace_back(sst); + } + } + if (!sstables_requiring_cleanup.empty()) { + cmlog.info("The following SSTables require cleaned up in this compaction: {}", sstables_requiring_cleanup); + if (!cs.owned_ranges_ptr) { + on_internal_error_noexcept(cmlog, "SSTables require cleanup but compaction state has null owned ranges"); + } + descriptor.owned_ranges = cs.owned_ranges_ptr; + } + } + co_return co_await sstables::compact_sstables(std::move(descriptor), cdata, t); } future<> compaction_task_executor::update_history(table_state& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata) { @@ -413,7 +447,7 @@ protected: table_state* t = _compacting_table; sstables::compaction_strategy cs = t->get_compaction_strategy(); sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(*t, _cm.get_candidates(*t)); - auto compacting = compacting_sstable_registration(_cm, descriptor.sstables); + auto compacting = compacting_sstable_registration(_cm, _cm.get_compaction_state(t), descriptor.sstables); auto release_exhausted = [&compacting] (const std::vector& exhausted_sstables) { compacting.release_compacting(exhausted_sstables); }; @@ -568,7 +602,7 @@ inline compaction_controller make_compaction_controller(const compaction_manager return compaction_controller(csg, static_shares, 250ms, std::move(fn)); } -compaction_manager::compaction_state::~compaction_state() { +compaction::compaction_state::~compaction_state() { compaction_done.broken(); } @@ -1011,7 +1045,7 @@ protected: _cm.postpone_compaction_for_table(&t); co_return std::nullopt; } - auto compacting = compacting_sstable_registration(_cm, descriptor.sstables); + auto compacting = compacting_sstable_registration(_cm, _cm.get_compaction_state(&t), descriptor.sstables); auto weight_r = compaction_weight_registration(&_cm, weight); auto release_exhausted = [&compacting] (const std::vector& exhausted_sstables) { compacting.release_compacting(exhausted_sstables); @@ -1268,15 +1302,17 @@ namespace compaction { class rewrite_sstables_compaction_task_executor : public sstables_task_executor { sstables::compaction_type_options _options; + owned_ranges_ptr _owned_ranges_ptr; compacting_sstable_registration _compacting; compaction_manager::can_purge_tombstones _can_purge; public: - rewrite_sstables_compaction_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type_options options, + rewrite_sstables_compaction_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type_options options, owned_ranges_ptr owned_ranges_ptr, std::vector sstables, compacting_sstable_registration compacting, compaction_manager::can_purge_tombstones can_purge) : sstables_task_executor(mgr, t, options.type(), sstring(sstables::to_string(options.type())), std::move(sstables)) , _options(std::move(options)) + , _owned_ranges_ptr(std::move(owned_ranges_ptr)) , _compacting(std::move(compacting)) , _can_purge(can_purge) {} @@ -1308,7 +1344,7 @@ private: auto run_identifier = sst->run_identifier(); // FIXME: this compaction should run with maintenance priority. auto descriptor = sstables::compaction_descriptor({ sst }, service::get_local_compaction_priority(), - sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, _options); + sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, _options, _owned_ranges_ptr); // Releases reference to cleaned sstable such that respective used disk space can be freed. auto release_exhausted = [this] (const std::vector& exhausted_sstables) { @@ -1343,7 +1379,7 @@ private: template requires std::derived_from -future compaction_manager::perform_task_on_all_files(table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) { +future compaction_manager::perform_task_on_all_files(table_state& t, sstables::compaction_type_options options, owned_ranges_ptr owned_ranges_ptr, get_candidates_func get_func, Args... args) { if (_state != state::enabled) { co_return std::nullopt; } @@ -1353,7 +1389,7 @@ future compaction_manager::perform_tas // in the re-write, we need to barrier out any previously running // compaction. std::vector sstables; - compacting_sstable_registration compacting(*this); + compacting_sstable_registration compacting(*this, get_compaction_state(&t)); co_await run_with_compaction_disabled(t, [&sstables, &compacting, get_func = std::move(get_func)] () -> future<> { // Getting sstables and registering them as compacting must be atomic, to avoid a race condition where // regular compaction runs in between and picks the same files. @@ -1367,11 +1403,11 @@ future compaction_manager::perform_tas return a->data_size() > b->data_size(); }); }); - co_return co_await perform_task(seastar::make_shared(*this, &t, std::move(options), std::move(sstables), std::move(compacting), std::forward(args)...)); + co_return co_await perform_task(seastar::make_shared(*this, &t, std::move(options), std::move(owned_ranges_ptr), std::move(sstables), std::move(compacting), std::forward(args)...)); } -future compaction_manager::rewrite_sstables(table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) { - return perform_task_on_all_files(t, std::move(options), std::move(get_func), can_purge); +future compaction_manager::rewrite_sstables(table_state& t, sstables::compaction_type_options options, owned_ranges_ptr owned_ranges_ptr, get_candidates_func get_func, can_purge_tombstones can_purge) { + return perform_task_on_all_files(t, std::move(options), std::move(owned_ranges_ptr), std::move(get_func), can_purge); } namespace compaction { @@ -1452,13 +1488,15 @@ namespace compaction { class cleanup_sstables_compaction_task_executor : public compaction_task_executor { const sstables::compaction_type_options _cleanup_options; + owned_ranges_ptr _owned_ranges_ptr; compacting_sstable_registration _compacting; std::vector _pending_cleanup_jobs; public: - cleanup_sstables_compaction_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type_options options, + cleanup_sstables_compaction_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type_options options, owned_ranges_ptr owned_ranges_ptr, std::vector candidates, compacting_sstable_registration compacting) : compaction_task_executor(mgr, t, options.type(), sstring(sstables::to_string(options.type()))) , _cleanup_options(std::move(options)) + , _owned_ranges_ptr(std::move(owned_ranges_ptr)) , _compacting(std::move(compacting)) , _pending_cleanup_jobs(t->get_compaction_strategy().get_cleanup_compaction_jobs(*t, std::move(candidates))) { @@ -1479,6 +1517,7 @@ protected: while (!_pending_cleanup_jobs.empty() && can_proceed()) { auto active_job = std::move(_pending_cleanup_jobs.back()); active_job.options = _cleanup_options; + active_job.owned_ranges = _owned_ranges_ptr; co_await run_cleanup_job(std::move(active_job)); _pending_cleanup_jobs.pop_back(); _cm._stats.pending_tasks--; @@ -1523,8 +1562,7 @@ private: } bool needs_cleanup(const sstables::shared_sstable& sst, - const dht::token_range_vector& sorted_owned_ranges, - schema_ptr s) { + const dht::token_range_vector& sorted_owned_ranges) { auto first_token = sst->get_first_decorated_key().token(); auto last_token = sst->get_last_decorated_key().token(); dht::token_range sst_token_range = dht::token_range::make(first_token, last_token); @@ -1544,6 +1582,22 @@ bool needs_cleanup(const sstables::shared_sstable& sst, return true; } +bool compaction_manager::update_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst, owned_ranges_ptr owned_ranges_ptr) { + auto& cs = get_compaction_state(&t); + if (owned_ranges_ptr && needs_cleanup(sst, *owned_ranges_ptr)) { + cs.sstables_requiring_cleanup.insert(sst); + return true; + } else { + cs.sstables_requiring_cleanup.erase(sst); + return false; + } +} + +bool compaction_manager::requires_cleanup(table_state& t, const sstables::shared_sstable& sst) const { + const auto& cs = get_compaction_state(&t); + return cs.sstables_requiring_cleanup.contains(sst); +} + future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_ranges, table_state& t) { auto check_for_cleanup = [this, &t] { return boost::algorithm::any_of(_tasks, [&t] (auto& task) { @@ -1555,20 +1609,33 @@ future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_range t.schema()->ks_name(), t.schema()->cf_name())); } + if (sorted_owned_ranges->empty()) { + throw std::runtime_error("cleanup request failed: sorted_owned_ranges is empty"); + } + + // Called with compaction_disabled auto get_sstables = [this, &t, sorted_owned_ranges] () -> future> { return seastar::async([this, &t, sorted_owned_ranges = std::move(sorted_owned_ranges)] { - auto schema = t.schema(); - auto sstables = std::vector{}; - const auto candidates = get_candidates(t); - std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&sorted_owned_ranges, schema] (const sstables::shared_sstable& sst) { - seastar::thread::maybe_yield(); - return sorted_owned_ranges->empty() || needs_cleanup(sst, *sorted_owned_ranges, schema); - }); - return sstables; + auto update_sstables_cleanup_state = [&] (const sstables::sstable_set& set) { + set.for_each_sstable([&] (const sstables::shared_sstable& sst) { + update_sstable_cleanup_state(t, sst, sorted_owned_ranges); + seastar::thread::maybe_yield(); + }); + }; + update_sstables_cleanup_state(t.main_sstable_set()); + update_sstables_cleanup_state(t.maintenance_sstable_set()); + // Some sstables may remain in sstables_requiring_cleanup + // for later processing if they can't be cleaned up right now. + // They are erased from sstables_requiring_cleanup by compacting.release_compacting + auto& cs = get_compaction_state(&t); + if (!cs.sstables_requiring_cleanup.empty()) { + cs.owned_ranges_ptr = std::move(sorted_owned_ranges); + } + return get_candidates(t, cs.sstables_requiring_cleanup); }); }; - co_await perform_task_on_all_files(t, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)), + co_await perform_task_on_all_files(t, sstables::compaction_type_options::make_cleanup(), std::move(sorted_owned_ranges), std::move(get_sstables)); } @@ -1597,7 +1664,7 @@ future<> compaction_manager::perform_sstable_upgrade(owned_ranges_ptr sorted_own // Note that we potentially could be doing multiple // upgrades here in parallel, but that is really the users // problem. - return rewrite_sstables(t, sstables::compaction_type_options::make_upgrade(std::move(sorted_owned_ranges)), std::move(get_sstables)).discard_result(); + return rewrite_sstables(t, sstables::compaction_type_options::make_upgrade(), std::move(sorted_owned_ranges), std::move(get_sstables)).discard_result(); } // Submit a table to be scrubbed and wait for its termination. @@ -1606,7 +1673,8 @@ future compaction_manager::perform_sst if (scrub_mode == sstables::compaction_type_options::scrub::mode::validate) { return perform_sstable_scrub_validate_mode(t); } - return rewrite_sstables(t, sstables::compaction_type_options::make_scrub(scrub_mode), [&t, opts] { + owned_ranges_ptr owned_ranges_ptr = {}; + return rewrite_sstables(t, sstables::compaction_type_options::make_scrub(scrub_mode), std::move(owned_ranges_ptr), [&t, opts] { auto all_sstables = get_all_sstables(t); std::vector sstables = boost::copy_range>(all_sstables | boost::adaptors::filtered([&opts] (const sstables::shared_sstable& sst) { @@ -1627,7 +1695,7 @@ future compaction_manager::perform_sst }, can_purge_tombstones::no); } -compaction_manager::compaction_state::compaction_state(table_state& t) +compaction::compaction_state::compaction_state(table_state& t) : backlog_tracker(t.get_compaction_strategy().make_backlog_tracker()) { } diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index c8e9bd32a9..9faaf10cf0 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -22,6 +22,7 @@ #include #include #include "log.hh" +#include "sstables/shared_sstable.hh" #include "utils/exponential_backoff_retry.hh" #include "utils/updateable_value.hh" #include "utils/serialized_action.hh" @@ -33,6 +34,7 @@ #include "compaction_weight_registration.hh" #include "compaction_backlog_manager.hh" #include "compaction/task_manager_module.hh" +#include "compaction_state.hh" #include "strategy_control.hh" #include "backlog_controller.hh" #include "seastarx.hh" @@ -83,31 +85,6 @@ public: utils::updateable_value static_shares = utils::updateable_value(0); utils::updateable_value throughput_mb_per_sec = utils::updateable_value(0); }; -private: - struct compaction_state { - // Used both by compaction tasks that refer to the compaction_state - // and by any function running under run_with_compaction_disabled(). - seastar::gate gate; - - // Prevents table from running major and minor compaction at the same time. - rwlock lock; - - // Raised by any function running under run_with_compaction_disabled(); - long compaction_disabled_counter = 0; - - // Signaled whenever a compaction task completes. - condition_variable compaction_done; - - compaction_backlog_tracker backlog_tracker; - - explicit compaction_state(table_state& t); - compaction_state(compaction_state&&) = delete; - ~compaction_state(); - - bool compaction_disabled() const noexcept { - return compaction_disabled_counter > 0; - } - }; public: class can_purge_tombstones_tag; @@ -203,7 +180,11 @@ private: void deregister_weight(int weight); // Get candidates for compaction strategy, which are all sstables but the ones being compacted. - std::vector get_candidates(compaction::table_state& t); + std::vector get_candidates(compaction::table_state& t) const; + + template + requires std::convertible_to, sstables::shared_sstable> + std::vector get_candidates(table_state& t, const Range& sstables) const; template requires std::same_as || std::sentinel_for @@ -216,6 +197,9 @@ private: // gets the table's compaction state // throws std::out_of_range exception if not found. compaction_state& get_compaction_state(compaction::table_state* t); + const compaction_state& get_compaction_state(compaction::table_state* t) const { + return const_cast(this)->get_compaction_state(t); + } // Return true if compaction manager is enabled and // table still exists and compaction is not disabled for the table. @@ -236,9 +220,9 @@ private: // by retrieving set of candidates only after all compactions for table T were stopped, if any. template requires std::derived_from - future perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, Args... args); + future perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, owned_ranges_ptr, get_candidates_func, Args... args); - future rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes); + future rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, owned_ranges_ptr, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes); // Stop all fibers, without waiting. Safe to be called multiple times. void do_stop() noexcept; @@ -343,7 +327,7 @@ public: class compaction_reenabler { compaction_manager& _cm; compaction::table_state* _table; - compaction_manager::compaction_state& _compaction_state; + compaction::compaction_state& _compaction_state; gate::holder _holder; public: @@ -356,7 +340,7 @@ public: return _table; } - const compaction_manager::compaction_state& compaction_state() const noexcept { + const compaction::compaction_state& compaction_state() const noexcept { return _compaction_state; } }; @@ -420,6 +404,12 @@ public: return _tombstone_gc_state; }; + // Add sst to or remove it from the respective compaction_state.sstables_requiring_cleanup set. + bool update_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst, owned_ranges_ptr owned_ranges_ptr); + + // checks if the sstable is in the respective compaction_state.sstables_requiring_cleanup set. + bool requires_cleanup(table_state& t, const sstables::shared_sstable& sst) const; + friend class compacting_sstable_registration; friend class compaction_weight_registration; friend class compaction_manager_test; @@ -457,7 +447,7 @@ public: protected: compaction_manager& _cm; ::compaction::table_state* _compacting_table = nullptr; - compaction_manager::compaction_state& _compaction_state; + compaction::compaction_state& _compaction_state; sstables::compaction_data _compaction_data; state _state = state::none; @@ -562,7 +552,7 @@ std::ostream& operator<<(std::ostream& os, const compaction::compaction_task_exe } -bool needs_cleanup(const sstables::shared_sstable& sst, const dht::token_range_vector& owned_ranges, schema_ptr s); +bool needs_cleanup(const sstables::shared_sstable& sst, const dht::token_range_vector& owned_ranges); // Return all sstables but those that are off-strategy like the ones in maintenance set and staging dir. std::vector in_strategy_sstables(compaction::table_state& table_s); diff --git a/compaction/compaction_state.hh b/compaction/compaction_state.hh new file mode 100644 index 0000000000..729992e9b1 --- /dev/null +++ b/compaction/compaction_state.hh @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include +#include +#include +#include "seastarx.hh" + +#include "compaction/compaction_fwd.hh" +#include "compaction/compaction_backlog_manager.hh" + +namespace compaction { + +struct compaction_state { + // Used both by compaction tasks that refer to the compaction_state + // and by any function running under run_with_compaction_disabled(). + seastar::gate gate; + + // Prevents table from running major and minor compaction at the same time. + seastar::rwlock lock; + + // Raised by any function running under run_with_compaction_disabled(); + long compaction_disabled_counter = 0; + + // Signaled whenever a compaction task completes. + condition_variable compaction_done; + + compaction_backlog_tracker backlog_tracker; + + std::unordered_set sstables_requiring_cleanup; + owned_ranges_ptr owned_ranges_ptr; + + explicit compaction_state(table_state& t); + compaction_state(compaction_state&&) = delete; + ~compaction_state(); + + bool compaction_disabled() const noexcept { + return compaction_disabled_counter > 0; + } +}; + +} // namespace compaction_manager diff --git a/compaction/compaction_strategy_impl.hh b/compaction/compaction_strategy_impl.hh index afea050cb2..6342b6f203 100644 --- a/compaction/compaction_strategy_impl.hh +++ b/compaction/compaction_strategy_impl.hh @@ -15,11 +15,6 @@ #include "compaction_descriptor.hh" #include "tombstone_gc.hh" -namespace compaction { -class table_state; -class strategy_control; -} - namespace sstables { class sstable_set_impl; diff --git a/compaction/strategy_control.hh b/compaction/strategy_control.hh index 7e4b3f9a4d..bf7231198e 100644 --- a/compaction/strategy_control.hh +++ b/compaction/strategy_control.hh @@ -9,9 +9,9 @@ #pragma once -namespace compaction { +#include "compaction/compaction_fwd.hh" -class table_state; +namespace compaction { // Used by manager to set goals and constraints on compaction strategies class strategy_control { diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index e9e4f5a6b1..309327019b 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -11,13 +11,10 @@ #include "compaction/compaction_backlog_manager.hh" #include "compaction/compaction_strategy_state.hh" #include "sstables/sstable_set.hh" +#include "compaction/compaction_fwd.hh" #pragma once -namespace compaction { -class table_state; -} - namespace replica { using enable_backlog_tracker = bool_class; diff --git a/replica/database.hh b/replica/database.hh index 390867d805..714b2eb10b 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -63,6 +63,7 @@ #include "db/rate_limiter.hh" #include "db/operation_type.hh" #include "utils/serialized_action.hh" +#include "compaction/compaction_fwd.hh" #include "compaction/compaction_manager.hh" #include "utils/disk-error-handler.hh" #include "rust/wasmtime_bindings.hh" @@ -98,10 +99,6 @@ class directory_semaphore; } -namespace compaction { -class table_state; -} - namespace ser { template class serializer; @@ -549,7 +546,7 @@ private: // Select a compaction group from a given key. compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const noexcept; // Select a compaction group from a given sstable based on its token range. - compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) noexcept; + compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const noexcept; // Returns a list of all compaction groups. const std::vector>& compaction_groups() const noexcept; // Safely iterate through compaction groups, while performing async operations on them. @@ -1125,6 +1122,15 @@ public: // Safely iterate through table states, while performing async operations on them. future<> parallel_foreach_table_state(std::function(compaction::table_state&)> action); + // Add sst to or remove it from the sstables_requiring_cleanup set. + bool update_sstable_cleanup_state(const sstables::shared_sstable& sst, compaction::owned_ranges_ptr owned_ranges_ptr); + + // Returns true if the sstable requries cleanup. + bool requires_cleanup(const sstables::shared_sstable& sst) const; + + // Returns true if any of the sstables requries cleanup. + bool requires_cleanup(const sstables::sstable_set& set) const; + friend class compaction_group; }; diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 2b31a6fc51..49e76640d1 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -145,11 +145,11 @@ struct reshard_shard_descriptor { } }; -// Collects shared SSTables from all shards and returns a vector containing them all. +// Collects shared SSTables from all shards and sstables that require cleanup and returns a vector containing them all. // This function assumes that the list of SSTables can be fairly big so it is careful to // manipulate it in a do_for_each loop (which yields) instead of using standard accumulators. future -collect_all_shared_sstables(sharded& dir) { +collect_all_shared_sstables(sharded& dir, sharded& db, sstring ks_name, sstring table_name, compaction::owned_ranges_ptr owned_ranges_ptr) { auto info_vec = sstables::sstable_directory::sstable_open_info_vector(); // We want to make sure that each distributed object reshards about the same amount of data. @@ -161,10 +161,28 @@ collect_all_shared_sstables(sharded& dir) { auto coordinator = this_shard_id(); // We will first move all of the foreign open info to temporary storage so that we can sort // them. We want to distribute bigger sstables first. - co_await dir.invoke_on_all([&info_vec, coordinator] (sstables::sstable_directory& d) -> future<> { + co_await dir.invoke_on_all([&] (sstables::sstable_directory& d) -> future<> { + auto shared_sstables = d.retrieve_shared_sstables(); + sstables::sstable_directory::sstable_open_info_vector need_cleanup; + if (owned_ranges_ptr) { + auto& table = db.local().find_column_family(ks_name, table_name); + co_await d.do_for_each_sstable([&] (sstables::shared_sstable sst) -> future<> { + if (table.update_sstable_cleanup_state(sst, owned_ranges_ptr)) { + need_cleanup.push_back(co_await sst->get_open_info()); + } + }); + } + if (shared_sstables.empty() && need_cleanup.empty()) { + co_return; + } co_await smp::submit_to(coordinator, [&] () -> future<> { - for (auto& info : d.retrieve_shared_sstables()) { - info_vec.push_back(std::move(info)); + info_vec.reserve(info_vec.size() + shared_sstables.size() + need_cleanup.size()); + for (auto& info : shared_sstables) { + info_vec.emplace_back(std::move(info)); + co_await coroutine::maybe_yield(); + } + for (auto& info : need_cleanup) { + info_vec.emplace_back(std::move(info)); co_await coroutine::maybe_yield(); } }); @@ -207,7 +225,7 @@ distribute_reshard_jobs(sstables::sstable_directory::sstable_open_info_vector so // A creator function must be passed that will create an SSTable object in the correct shard, // and an I/O priority must be specified. future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::sstable_open_info_vector shared_info, replica::table& table, - sstables::compaction_sstable_creator_fn creator, io_priority_class iop) + sstables::compaction_sstable_creator_fn creator, io_priority_class iop, compaction::owned_ranges_ptr owned_ranges_ptr) { // Resharding doesn't like empty sstable sets, so bail early. There is nothing // to reshard in this shard. @@ -221,9 +239,14 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory:: auto num_jobs = (shared_info.size() + max_sstables_per_job - 1) / max_sstables_per_job; auto sstables_per_job = shared_info.size() / num_jobs; - std::vector> buckets(1); - co_await coroutine::parallel_for_each(shared_info, [&dir, sstables_per_job, num_jobs, &buckets] (sstables::foreign_sstable_open_info& info) -> future<> { + std::vector> buckets; + buckets.reserve(num_jobs); + buckets.emplace_back(); + co_await coroutine::parallel_for_each(shared_info, [&] (sstables::foreign_sstable_open_info& info) -> future<> { auto sst = co_await dir.load_foreign_sstable(info); + if (owned_ranges_ptr) { + table.update_sstable_cleanup_state(sst, owned_ranges_ptr); + } // Last bucket gets leftover SSTables if ((buckets.back().size() >= sstables_per_job) && (buckets.size() < num_jobs)) { buckets.emplace_back(); @@ -233,13 +256,15 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory:: // There is a semaphore inside the compaction manager in run_resharding_jobs. So we // parallel_for_each so the statistics about pending jobs are updated to reflect all // jobs. But only one will run in parallel at a time - co_await coroutine::parallel_for_each(buckets, [&dir, &table, creator = std::move(creator), iop] (std::vector& sstlist) mutable { - return table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshard, "Reshard compaction", [&dir, &table, creator, &sstlist, iop] (sstables::compaction_data& info) -> future<> { + auto& t = table.as_table_state(); + co_await coroutine::parallel_for_each(buckets, [&] (std::vector& sstlist) mutable { + return table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshard, "Reshard compaction", [&] (sstables::compaction_data& info) -> future<> { sstables::compaction_descriptor desc(sstlist, iop); desc.options = sstables::compaction_type_options::make_reshard(); - desc.creator = std::move(creator); + desc.creator = creator; + desc.owned_ranges = owned_ranges_ptr; - auto result = co_await sstables::compact_sstables(std::move(desc), info, table.as_table_state()); + auto result = co_await sstables::compact_sstables(std::move(desc), info, t); // input sstables are moved, to guarantee their resources are released once we're done // resharding them. co_await when_all_succeed(dir.collect_output_unshared_sstables(std::move(result.new_sstables), sstables::sstable_directory::can_be_remote::yes), dir.remove_sstables(std::move(sstlist))).discard_result(); @@ -249,7 +274,7 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory:: future<> run_resharding_jobs(sharded& dir, std::vector reshard_jobs, sharded& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, - io_priority_class iop) { + io_priority_class iop, compaction::owned_ranges_ptr owned_ranges_ptr) { uint64_t total_size = boost::accumulate(reshard_jobs | boost::adaptors::transformed(std::mem_fn(&reshard_shard_descriptor::size)), uint64_t(0)); if (total_size == 0) { @@ -262,7 +287,12 @@ future<> run_resharding_jobs(sharded& dir, std::vec co_await dir.invoke_on_all(coroutine::lambda([&] (sstables::sstable_directory& d) -> future<> { auto& table = db.local().find_column_family(ks_name, table_name); auto info_vec = std::move(reshard_jobs[this_shard_id()].info_vec); - co_await ::replica::reshard(d, std::move(info_vec), table, creator, iop); + // make shard-local copy of owned_ranges + compaction::owned_ranges_ptr local_owned_ranges_ptr; + if (owned_ranges_ptr) { + local_owned_ranges_ptr = make_lw_shared(*owned_ranges_ptr); + } + co_await ::replica::reshard(d, std::move(info_vec), table, creator, iop, std::move(local_owned_ranges_ptr)); co_await d.move_foreign_sstables(dir); })); @@ -276,10 +306,10 @@ future<> run_resharding_jobs(sharded& dir, std::vec // - The second part calls each shard's distributed object to reshard the SSTables they were // assigned. future<> -distributed_loader::reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, io_priority_class iop) { - auto all_jobs = co_await collect_all_shared_sstables(dir); +distributed_loader::reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, io_priority_class iop, compaction::owned_ranges_ptr owned_ranges_ptr) { + auto all_jobs = co_await collect_all_shared_sstables(dir, db, ks_name, table_name, owned_ranges_ptr); auto destinations = co_await distribute_reshard_jobs(std::move(all_jobs)); - co_await run_resharding_jobs(dir, std::move(destinations), db, ks_name, table_name, std::move(creator), iop); + co_await run_resharding_jobs(dir, std::move(destinations), db, ks_name, table_name, std::move(creator), iop, std::move(owned_ranges_ptr)); } future @@ -451,9 +481,20 @@ distributed_loader::process_upload_dir(distributed& db, distr return sstables::generation_type(gen); }; + // Pass owned_ranges_ptr to reshard to piggy-back cleanup on the resharding compaction. + // Note that needs_cleanup() is inaccurate and may return false positives, + // maybe triggerring resharding+cleanup unnecessarily for some sstables. + // But this is resharding on refresh (sstable loading via upload dir), + // which will usually require resharding anyway. + // + // FIXME: take multiple compaction groups into account + // - segregate resharded tables into compaction groups + // - split the keyspace local ranges per compaction_group as done in table::perform_cleanup_compaction + // so that cleanup can be considered per compaction group + auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(db.local().get_keyspace_local_ranges(ks)); reshard(directory, db, ks, cf, [&] (shard_id shard) mutable { return make_sstable(*global_table, upload, new_generation_for_shard(shard)); - }, service::get_local_streaming_priority()).get(); + }, service::get_local_streaming_priority(), owned_ranges_ptr).get(); reshape(directory, db, sstables::reshape_mode::strict, ks, cf, [&] (shard_id shard) { return make_sstable(*global_table, upload, new_generation_for_shard(shard)); diff --git a/replica/distributed_loader.hh b/replica/distributed_loader.hh index 44fdc6c43f..942aca160c 100644 --- a/replica/distributed_loader.hh +++ b/replica/distributed_loader.hh @@ -69,7 +69,7 @@ class distributed_loader { static future<> reshape(sharded& dir, sharded& db, sstables::reshape_mode mode, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, std::function filter, io_priority_class iop); static future<> reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, - io_priority_class iop); + io_priority_class iop, compaction::owned_ranges_ptr owned_ranges_ptr = nullptr); static future<> process_sstable_dir(sharded& dir, sstables::sstable_directory::process_flags flags); static future<> lock_table(sharded& dir, sharded& db, sstring ks_name, sstring cf_name); static future make_sstables_available(sstables::sstable_directory& dir, diff --git a/replica/table.cc b/replica/table.cc index ed1be6f8ad..e8483102e5 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -557,7 +557,7 @@ compaction_group& table::compaction_group_for_key(partition_key_view key, const return compaction_group_for_token(dht::get_token(*s, key)); } -compaction_group& table::compaction_group_for_sstable(const sstables::shared_sstable& sst) noexcept { +compaction_group& table::compaction_group_for_sstable(const sstables::shared_sstable& sst) const noexcept { // FIXME: a sstable can belong to more than one group, change interface to reflect that. return compaction_group_for_token(sst->get_first_decorated_key().token()); } @@ -1910,11 +1910,12 @@ future table::discard_sstables(db_clock::time_point truncat tlogger.debug("cleaning out row cache"); })); rebuild_statistics(); - co_await coroutine::parallel_for_each(p->remove, [p] (pruner::removed_sstable& r) { + co_await coroutine::parallel_for_each(p->remove, [this, p] (pruner::removed_sstable& r) -> future<> { if (r.enable_backlog_tracker) { remove_sstable_from_backlog_tracker(r.cg.get_backlog_tracker(), r.sst); } - return sstables::sstable_directory::delete_atomically({r.sst}); + co_await sstables::sstable_directory::delete_atomically({r.sst}); + update_sstable_cleanup_state(r.sst, {}); }); co_return p->rp; } @@ -2835,4 +2836,22 @@ table::as_data_dictionary() const { return _impl.wrap(*this); } +bool table::update_sstable_cleanup_state(const sstables::shared_sstable& sst, compaction::owned_ranges_ptr owned_ranges_ptr) { + // FIXME: it's possible that the sstable belongs to multiple compaction_groups + auto& cg = compaction_group_for_sstable(sst); + return get_compaction_manager().update_sstable_cleanup_state(cg.as_table_state(), sst, std::move(owned_ranges_ptr)); +} + +bool table::requires_cleanup(const sstables::shared_sstable& sst) const { + auto& cg = compaction_group_for_sstable(sst); + return get_compaction_manager().requires_cleanup(cg.as_table_state(), sst); +} + +bool table::requires_cleanup(const sstables::sstable_set& set) const { + return bool(set.for_each_sstable_until([this] (const sstables::shared_sstable &sst) { + auto& cg = compaction_group_for_sstable(sst); + return stop_iteration(_compaction_manager.requires_cleanup(cg.as_table_state(), sst)); + })); +} + } // namespace replica diff --git a/sstables/shared_sstable.hh b/sstables/shared_sstable.hh index 4bd1d355dd..a8c47881ac 100644 --- a/sstables/shared_sstable.hh +++ b/sstables/shared_sstable.hh @@ -12,6 +12,9 @@ #include #include #include + +#include + #include namespace sstables { @@ -35,6 +38,22 @@ namespace sstables { using shared_sstable = seastar::lw_shared_ptr; using sstable_list = std::unordered_set; +std::string to_string(const shared_sstable& sst, bool include_origin = true); + +} // namespace sstables + +template <> +struct fmt::formatter : fmt::formatter { + template + auto format(const sstables::shared_sstable& sst, FormatContext& ctx) const { + return fmt::format_to(ctx.out(), "{}", sstables::to_string(sst)); + } +}; + +namespace std { + +inline std::ostream& operator<<(std::ostream& os, const sstables::shared_sstable& sst) { + return os << fmt::format("{}", sst); } - +} // namespace std diff --git a/sstables/sstable_set.cc b/sstables/sstable_set.cc index 93349dfe6e..c21bbcb2cc 100644 --- a/sstables/sstable_set.cc +++ b/sstables/sstable_set.cc @@ -162,7 +162,14 @@ sstable_set::all() const { } void sstable_set::for_each_sstable(std::function func) const { - return _impl->for_each_sstable(std::move(func)); + _impl->for_each_sstable_until([func = std::move(func)] (const shared_sstable& sst) { + func(sst); + return stop_iteration::no; + }); +} + +stop_iteration sstable_set::for_each_sstable_until(std::function func) const { + return _impl->for_each_sstable_until(std::move(func)); } void @@ -315,10 +322,13 @@ lw_shared_ptr partitioned_sstable_set::all() const { return _all; } -void partitioned_sstable_set::for_each_sstable(std::function func) const { +stop_iteration partitioned_sstable_set::for_each_sstable_until(std::function func) const { for (auto& sst : *_all) { - func(sst); + if (func(sst)) { + return stop_iteration::yes; + } } + return stop_iteration::no; } void partitioned_sstable_set::insert(shared_sstable sst) { @@ -451,10 +461,13 @@ time_series_sstable_set::size() const noexcept { return _sstables->size(); } -void time_series_sstable_set::for_each_sstable(std::function func) const { +stop_iteration time_series_sstable_set::for_each_sstable_until(std::function func) const { for (auto& entry : *_sstables) { - func(entry.second); + if (func(entry.second)) { + return stop_iteration::yes; + } } + return stop_iteration::no; } // O(log n) @@ -1022,12 +1035,13 @@ lw_shared_ptr compound_sstable_set::all() const { return ret; } -void compound_sstable_set::for_each_sstable(std::function func) const { +stop_iteration compound_sstable_set::for_each_sstable_until(std::function func) const { for (auto& set : _sets) { - set->for_each_sstable([&func] (const shared_sstable& sst) { - func(sst); - }); + if (set->for_each_sstable_until([&func] (const shared_sstable& sst) { return func(sst); })) { + return stop_iteration::yes; + } } + return stop_iteration::no; } void compound_sstable_set::insert(shared_sstable sst) { @@ -1219,7 +1233,7 @@ flat_mutation_reader_v2 sstable_set::make_crawling_reader( tracing::trace_state_ptr trace_ptr, read_monitor_generator& monitor_generator) const { std::vector readers; - _impl->for_each_sstable([&] (const shared_sstable& sst) mutable { + for_each_sstable([&] (const shared_sstable& sst) mutable { readers.emplace_back(sst->make_crawling_reader(schema, permit, pc, trace_ptr, monitor_generator(sst))); }); return make_combined_reader(schema, std::move(permit), std::move(readers), streamed_mutation::forwarding::no, mutation_reader::forwarding::no); diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index 7dc4d9f486..067f53c657 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -66,6 +66,9 @@ public: lw_shared_ptr all() const; // Prefer for_each_sstable() over all() for iteration purposes, as the latter may have to copy all sstables into a temporary void for_each_sstable(std::function func) const; + // Calls func for each sstable or until it returns stop_iteration::yes + // Returns the last stop_iteration value. + stop_iteration for_each_sstable_until(std::function func) const; void insert(shared_sstable sst); void erase(shared_sstable sst); size_t size() const noexcept; diff --git a/sstables/sstable_set_impl.hh b/sstables/sstable_set_impl.hh index 17c61509d3..3ee17ef390 100644 --- a/sstables/sstable_set_impl.hh +++ b/sstables/sstable_set_impl.hh @@ -30,7 +30,7 @@ public: virtual std::vector select(const dht::partition_range& range) const = 0; virtual std::vector select_sstable_runs(const std::vector& sstables) const; virtual lw_shared_ptr all() const = 0; - virtual void for_each_sstable(std::function func) const = 0; + virtual stop_iteration for_each_sstable_until(std::function func) const = 0; virtual void insert(shared_sstable sst) = 0; virtual void erase(shared_sstable sst) = 0; virtual size_t size() const noexcept = 0; @@ -95,7 +95,7 @@ public: virtual std::vector select(const dht::partition_range& range) const override; virtual std::vector select_sstable_runs(const std::vector& sstables) const override; virtual lw_shared_ptr all() const override; - virtual void for_each_sstable(std::function func) const override; + virtual stop_iteration for_each_sstable_until(std::function func) const override; virtual void insert(shared_sstable sst) override; virtual void erase(shared_sstable sst) override; virtual size_t size() const noexcept override; @@ -121,7 +121,7 @@ public: virtual std::unique_ptr clone() const override; virtual std::vector select(const dht::partition_range& range = query::full_partition_range) const override; virtual lw_shared_ptr all() const override; - virtual void for_each_sstable(std::function func) const override; + virtual stop_iteration for_each_sstable_until(std::function func) const override; virtual void insert(shared_sstable sst) override; virtual void erase(shared_sstable sst) override; virtual size_t size() const noexcept override; @@ -161,7 +161,7 @@ public: virtual std::vector select(const dht::partition_range& range = query::full_partition_range) const override; virtual std::vector select_sstable_runs(const std::vector& sstables) const override; virtual lw_shared_ptr all() const override; - virtual void for_each_sstable(std::function func) const override; + virtual stop_iteration for_each_sstable_until(std::function func) const override; virtual void insert(shared_sstable sst) override; virtual void erase(shared_sstable sst) override; virtual size_t size() const noexcept override; diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 6c530cf89f..b0ab011dbf 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -3608,6 +3608,12 @@ future<> remove_table_directory_if_has_no_snapshots(fs::path table_dir) { } } +std::string to_string(const shared_sstable& sst, bool include_origin) { + return include_origin ? + fmt::format("{}:level={:d}:origin={}", sst->get_filename(), sst->get_sstable_level(), sst->get_origin()) : + fmt::format("{}:level={:d}", sst->get_filename(), sst->get_sstable_level()); +} + } // namespace sstables namespace seastar { diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 18d8855d47..d57268650a 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -1998,7 +1998,7 @@ SEASTAR_TEST_CASE(sstable_cleanup_correctness_test) { auto local_ranges = compaction::make_owned_ranges_ptr(db.get_keyspace_local_ranges(ks_name)); auto descriptor = sstables::compaction_descriptor({std::move(sst)}, default_priority_class(), compaction_descriptor::default_level, - compaction_descriptor::default_max_sstable_bytes, run_identifier, compaction_type_options::make_cleanup(std::move(local_ranges))); + compaction_descriptor::default_max_sstable_bytes, run_identifier, compaction_type_options::make_cleanup(), std::move(local_ranges)); auto ret = compact_sstables(std::move(descriptor), cf, sst_gen).get0(); BOOST_REQUIRE(ret.new_sstables.size() == 1); @@ -3571,23 +3571,23 @@ SEASTAR_TEST_CASE(sstable_needs_cleanup_test) { { auto local_ranges = { token_range(0, 9) }; auto sst = sst_gen(keys[0], keys[9]); - BOOST_REQUIRE(!needs_cleanup(sst, local_ranges, s)); + BOOST_REQUIRE(!needs_cleanup(sst, local_ranges)); } { auto local_ranges = { token_range(0, 1), token_range(3, 4), token_range(5, 6) }; auto sst = sst_gen(keys[0], keys[1]); - BOOST_REQUIRE(!needs_cleanup(sst, local_ranges, s)); + BOOST_REQUIRE(!needs_cleanup(sst, local_ranges)); auto sst2 = sst_gen(keys[2], keys[2]); - BOOST_REQUIRE(needs_cleanup(sst2, local_ranges, s)); + BOOST_REQUIRE(needs_cleanup(sst2, local_ranges)); auto sst3 = sst_gen(keys[0], keys[6]); - BOOST_REQUIRE(needs_cleanup(sst3, local_ranges, s)); + BOOST_REQUIRE(needs_cleanup(sst3, local_ranges)); auto sst5 = sst_gen(keys[7], keys[7]); - BOOST_REQUIRE(needs_cleanup(sst5, local_ranges, s)); + BOOST_REQUIRE(needs_cleanup(sst5, local_ranges)); } }); } @@ -4886,3 +4886,28 @@ SEASTAR_TEST_CASE(compaction_manager_stop_and_drain_race_test) { testlog.info("stopping compaction manager"); co_await cm.stop(); } + +SEASTAR_TEST_CASE(test_print_shared_sstables_vector) { + return test_env::do_with_async([] (test_env& env) { + simple_schema ss; + auto s = ss.schema(); + auto pks = ss.make_pkeys(2); + auto sst_gen = env.make_sst_factory(s); + + std::vector ssts(2); + + auto mut0 = mutation(s, pks[0]); + mut0.partition().apply_insert(*s, ss.make_ckey(0), ss.new_timestamp()); + ssts[0] = make_sstable_containing(sst_gen, {std::move(mut0)}); + + auto mut1 = mutation(s, pks[1]); + mut1.partition().apply_insert(*s, ss.make_ckey(1), ss.new_timestamp()); + ssts[1] = make_sstable_containing(sst_gen, {std::move(mut1)}); + + std::string msg = format("{}", ssts); + for (const auto& sst : ssts) { + auto gen_str = format("{}", sst->generation()); + BOOST_REQUIRE(msg.find(gen_str) != std::string::npos); + } + }); +}