From 09df04c919062b55b94c794e79f136c0f83cda84 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 27 Dec 2022 14:45:12 +0200 Subject: [PATCH 01/24] compaction: move owned_ranges into descriptor Move the owned_ranges_ptr, currently used only by cleanup and upgrade compactions, to the generic compaction descriptor so we apply cleanup in other compaction types. Signed-off-by: Benny Halevy --- compaction/compaction.cc | 23 +++++++++-------------- compaction/compaction_descriptor.hh | 16 +++++++++------- compaction/compaction_manager.cc | 26 ++++++++++++++++---------- compaction/compaction_manager.hh | 4 ++-- test/boost/sstable_compaction_test.cc | 2 +- 5 files changed, 37 insertions(+), 34 deletions(-) diff --git a/compaction/compaction.cc b/compaction/compaction.cc index bac75f9441..5644c389a0 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -475,6 +475,8 @@ protected: // used to incrementally calculate max purgeable timestamp, as we iterate through decorated keys. std::optional _selector; std::unordered_set _compacting_for_max_purgeable_func; + // optional owned_ranges vector for cleanup; + owned_ranges_ptr _owned_ranges = {}; // Garbage collected sstables that are sealed but were not added to SSTable set yet. std::vector _unused_garbage_collected_sstables; // Garbage collected sstables that were added to SSTable set and should be eventually removed from it. @@ -503,6 +505,7 @@ protected: , _sstable_set(std::move(descriptor.all_sstables_snapshot)) , _selector(_sstable_set ? _sstable_set->make_incremental_selector() : std::optional{}) , _compacting_for_max_purgeable_func(std::unordered_set(_sstables.begin(), _sstables.end())) + , _owned_ranges(std::move(descriptor.owned_ranges)) { for (auto& sst : _sstables) { _stats_collector.update(sst->get_encoding_stats_for_compaction()); @@ -1175,7 +1178,6 @@ private: }; class cleanup_compaction final : public regular_compaction { - owned_ranges_ptr _owned_ranges; dht::incremental_owned_ranges_checker _owned_ranges_checker; private: // Called in a seastar thread @@ -1199,20 +1201,13 @@ protected: return compaction_completion_desc{std::move(input_sstables), std::move(output_sstables), std::move(ranges_for_for_invalidation)}; } -private: - cleanup_compaction(table_state& table_s, compaction_descriptor descriptor, compaction_data& cdata, owned_ranges_ptr owned_ranges) +public: + cleanup_compaction(table_state& table_s, compaction_descriptor descriptor, compaction_data& cdata) : regular_compaction(table_s, std::move(descriptor), cdata) - , _owned_ranges(std::move(owned_ranges)) , _owned_ranges_checker(*_owned_ranges) { } -public: - cleanup_compaction(table_state& table_s, compaction_descriptor descriptor, compaction_data& cdata, compaction_type_options::cleanup opts) - : cleanup_compaction(table_s, std::move(descriptor), cdata, std::move(opts.owned_ranges)) {} - cleanup_compaction(table_state& table_s, compaction_descriptor descriptor, compaction_data& cdata, compaction_type_options::upgrade opts) - : cleanup_compaction(table_s, std::move(descriptor), cdata, std::move(opts.owned_ranges)) {} - flat_mutation_reader_v2 make_sstable_reader() const override { return make_filtering_reader(regular_compaction::make_sstable_reader(), make_partition_filter()); } @@ -1710,11 +1705,11 @@ static std::unique_ptr make_compaction(table_state& table_s, sstable std::unique_ptr operator()(compaction_type_options::regular) { return std::make_unique(table_s, std::move(descriptor), cdata); } - std::unique_ptr operator()(compaction_type_options::cleanup options) { - return std::make_unique(table_s, std::move(descriptor), cdata, std::move(options)); + std::unique_ptr operator()(compaction_type_options::cleanup) { + return std::make_unique(table_s, std::move(descriptor), cdata); } - std::unique_ptr operator()(compaction_type_options::upgrade options) { - return std::make_unique(table_s, std::move(descriptor), cdata, std::move(options)); + std::unique_ptr operator()(compaction_type_options::upgrade) { + return std::make_unique(table_s, std::move(descriptor), cdata); } std::unique_ptr operator()(compaction_type_options::scrub scrub_options) { return std::make_unique(table_s, std::move(descriptor), cdata, scrub_options); diff --git a/compaction/compaction_descriptor.hh b/compaction/compaction_descriptor.hh index 28727a6102..ec1361f726 100644 --- a/compaction/compaction_descriptor.hh +++ b/compaction/compaction_descriptor.hh @@ -64,10 +64,8 @@ public: struct regular { }; struct cleanup { - compaction::owned_ranges_ptr owned_ranges; }; struct upgrade { - compaction::owned_ranges_ptr owned_ranges; }; struct scrub { enum class mode { @@ -112,12 +110,12 @@ public: return compaction_type_options(regular{}); } - static compaction_type_options make_cleanup(compaction::owned_ranges_ptr owned_ranges) { - return compaction_type_options(cleanup{std::move(owned_ranges)}); + static compaction_type_options make_cleanup() { + return compaction_type_options(cleanup{}); } - static compaction_type_options make_upgrade(compaction::owned_ranges_ptr owned_ranges) { - return compaction_type_options(upgrade{std::move(owned_ranges)}); + static compaction_type_options make_upgrade() { + return compaction_type_options(upgrade{}); } static compaction_type_options make_scrub(scrub::mode mode) { @@ -160,6 +158,8 @@ struct compaction_descriptor { // The options passed down to the compaction code. // This also selects the kind of compaction to do. compaction_type_options options = compaction_type_options::make_regular(); + // If engaged, compaction will cleanup the input sstables by skipping non-owned ranges. + compaction::owned_ranges_ptr owned_ranges; compaction_sstable_creator_fn creator; compaction_sstable_replacer_fn replacer; @@ -179,12 +179,14 @@ struct compaction_descriptor { int level = default_level, uint64_t max_sstable_bytes = default_max_sstable_bytes, run_id run_identifier = run_id::create_random_id(), - compaction_type_options options = compaction_type_options::make_regular()) + compaction_type_options options = compaction_type_options::make_regular(), + compaction::owned_ranges_ptr owned_ranges_ = {}) : sstables(std::move(sstables)) , level(level) , max_sstable_bytes(max_sstable_bytes) , run_identifier(run_identifier) , options(options) + , owned_ranges(std::move(owned_ranges_)) , io_priority(io_priority) {} diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index c6c9116491..f6ecd5eb15 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1268,15 +1268,17 @@ namespace compaction { class rewrite_sstables_compaction_task_executor : public sstables_task_executor { sstables::compaction_type_options _options; + owned_ranges_ptr _owned_ranges_ptr; compacting_sstable_registration _compacting; compaction_manager::can_purge_tombstones _can_purge; public: - rewrite_sstables_compaction_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type_options options, + rewrite_sstables_compaction_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type_options options, owned_ranges_ptr owned_ranges_ptr, std::vector sstables, compacting_sstable_registration compacting, compaction_manager::can_purge_tombstones can_purge) : sstables_task_executor(mgr, t, options.type(), sstring(sstables::to_string(options.type())), std::move(sstables)) , _options(std::move(options)) + , _owned_ranges_ptr(std::move(owned_ranges_ptr)) , _compacting(std::move(compacting)) , _can_purge(can_purge) {} @@ -1308,7 +1310,7 @@ private: auto run_identifier = sst->run_identifier(); // FIXME: this compaction should run with maintenance priority. auto descriptor = sstables::compaction_descriptor({ sst }, service::get_local_compaction_priority(), - sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, _options); + sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, _options, _owned_ranges_ptr); // Releases reference to cleaned sstable such that respective used disk space can be freed. auto release_exhausted = [this] (const std::vector& exhausted_sstables) { @@ -1343,7 +1345,7 @@ private: template requires std::derived_from -future compaction_manager::perform_task_on_all_files(table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) { +future compaction_manager::perform_task_on_all_files(table_state& t, sstables::compaction_type_options options, owned_ranges_ptr owned_ranges_ptr, get_candidates_func get_func, Args... args) { if (_state != state::enabled) { co_return std::nullopt; } @@ -1367,11 +1369,11 @@ future compaction_manager::perform_tas return a->data_size() > b->data_size(); }); }); - co_return co_await perform_task(seastar::make_shared(*this, &t, std::move(options), std::move(sstables), std::move(compacting), std::forward(args)...)); + co_return co_await perform_task(seastar::make_shared(*this, &t, std::move(options), std::move(owned_ranges_ptr), std::move(sstables), std::move(compacting), std::forward(args)...)); } -future compaction_manager::rewrite_sstables(table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) { - return perform_task_on_all_files(t, std::move(options), std::move(get_func), can_purge); +future compaction_manager::rewrite_sstables(table_state& t, sstables::compaction_type_options options, owned_ranges_ptr owned_ranges_ptr, get_candidates_func get_func, can_purge_tombstones can_purge) { + return perform_task_on_all_files(t, std::move(options), std::move(owned_ranges_ptr), std::move(get_func), can_purge); } namespace compaction { @@ -1452,13 +1454,15 @@ namespace compaction { class cleanup_sstables_compaction_task_executor : public compaction_task_executor { const sstables::compaction_type_options _cleanup_options; + owned_ranges_ptr _owned_ranges_ptr; compacting_sstable_registration _compacting; std::vector _pending_cleanup_jobs; public: - cleanup_sstables_compaction_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type_options options, + cleanup_sstables_compaction_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type_options options, owned_ranges_ptr owned_ranges_ptr, std::vector candidates, compacting_sstable_registration compacting) : compaction_task_executor(mgr, t, options.type(), sstring(sstables::to_string(options.type()))) , _cleanup_options(std::move(options)) + , _owned_ranges_ptr(std::move(owned_ranges_ptr)) , _compacting(std::move(compacting)) , _pending_cleanup_jobs(t->get_compaction_strategy().get_cleanup_compaction_jobs(*t, std::move(candidates))) { @@ -1479,6 +1483,7 @@ protected: while (!_pending_cleanup_jobs.empty() && can_proceed()) { auto active_job = std::move(_pending_cleanup_jobs.back()); active_job.options = _cleanup_options; + active_job.owned_ranges = _owned_ranges_ptr; co_await run_cleanup_job(std::move(active_job)); _pending_cleanup_jobs.pop_back(); _cm._stats.pending_tasks--; @@ -1568,7 +1573,7 @@ future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_range }); }; - co_await perform_task_on_all_files(t, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)), + co_await perform_task_on_all_files(t, sstables::compaction_type_options::make_cleanup(), std::move(sorted_owned_ranges), std::move(get_sstables)); } @@ -1597,7 +1602,7 @@ future<> compaction_manager::perform_sstable_upgrade(owned_ranges_ptr sorted_own // Note that we potentially could be doing multiple // upgrades here in parallel, but that is really the users // problem. - return rewrite_sstables(t, sstables::compaction_type_options::make_upgrade(std::move(sorted_owned_ranges)), std::move(get_sstables)).discard_result(); + return rewrite_sstables(t, sstables::compaction_type_options::make_upgrade(), std::move(sorted_owned_ranges), std::move(get_sstables)).discard_result(); } // Submit a table to be scrubbed and wait for its termination. @@ -1606,7 +1611,8 @@ future compaction_manager::perform_sst if (scrub_mode == sstables::compaction_type_options::scrub::mode::validate) { return perform_sstable_scrub_validate_mode(t); } - return rewrite_sstables(t, sstables::compaction_type_options::make_scrub(scrub_mode), [&t, opts] { + owned_ranges_ptr owned_ranges_ptr = {}; + return rewrite_sstables(t, sstables::compaction_type_options::make_scrub(scrub_mode), std::move(owned_ranges_ptr), [&t, opts] { auto all_sstables = get_all_sstables(t); std::vector sstables = boost::copy_range>(all_sstables | boost::adaptors::filtered([&opts] (const sstables::shared_sstable& sst) { diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index c8e9bd32a9..da0f39ee43 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -236,9 +236,9 @@ private: // by retrieving set of candidates only after all compactions for table T were stopped, if any. template requires std::derived_from - future perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, Args... args); + future perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, owned_ranges_ptr, get_candidates_func, Args... args); - future rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes); + future rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, owned_ranges_ptr, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes); // Stop all fibers, without waiting. Safe to be called multiple times. void do_stop() noexcept; diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 18d8855d47..92da0d4290 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -1998,7 +1998,7 @@ SEASTAR_TEST_CASE(sstable_cleanup_correctness_test) { auto local_ranges = compaction::make_owned_ranges_ptr(db.get_keyspace_local_ranges(ks_name)); auto descriptor = sstables::compaction_descriptor({std::move(sst)}, default_priority_class(), compaction_descriptor::default_level, - compaction_descriptor::default_max_sstable_bytes, run_identifier, compaction_type_options::make_cleanup(std::move(local_ranges))); + compaction_descriptor::default_max_sstable_bytes, run_identifier, compaction_type_options::make_cleanup(), std::move(local_ranges)); auto ret = compact_sstables(std::move(descriptor), cf, sst_gen).get0(); BOOST_REQUIRE(ret.new_sstables.size() == 1); From 0c6ce5af7428f79536415cda22107a3bd82eaa34 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 27 Dec 2022 15:01:52 +0200 Subject: [PATCH 02/24] compaction: move owned ranges filtering to base class Move the token filtering logic down from cleanup_compaction to regular_compaction and class compaction so it can be reused by other compaction types. Create a _owned_ranges_checker in class compaction when _owned_ranges is engaged, and use it in compaction::setup to filter partitions based on the owned ranges. Ref scylladb/scylladb#12998 Signed-off-by: Benny Halevy --- compaction/compaction.cc | 53 +++++++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/compaction/compaction.cc b/compaction/compaction.cc index 5644c389a0..83feb90dfd 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -477,6 +477,7 @@ protected: std::unordered_set _compacting_for_max_purgeable_func; // optional owned_ranges vector for cleanup; owned_ranges_ptr _owned_ranges = {}; + std::optional _owned_ranges_checker; // Garbage collected sstables that are sealed but were not added to SSTable set yet. std::vector _unused_garbage_collected_sstables; // Garbage collected sstables that were added to SSTable set and should be eventually removed from it. @@ -506,6 +507,7 @@ protected: , _selector(_sstable_set ? _sstable_set->make_incremental_selector() : std::optional{}) , _compacting_for_max_purgeable_func(std::unordered_set(_sstables.begin(), _sstables.end())) , _owned_ranges(std::move(descriptor.owned_ranges)) + , _owned_ranges_checker(_owned_ranges ? std::optional(*_owned_ranges) : std::nullopt) { for (auto& sst : _sstables) { _stats_collector.update(sst->get_encoding_stats_for_compaction()); @@ -624,6 +626,21 @@ protected: bool enable_garbage_collected_sstable_writer() const noexcept { return _contains_multi_fragment_runs && _max_sstable_size != std::numeric_limits::max(); } + + flat_mutation_reader_v2::filter make_partition_filter() const { + return [this] (const dht::decorated_key& dk) { +#ifdef SEASTAR_DEBUG + // sstables should never be shared with other shards at this point. + assert(dht::shard_of(*_schema, dk.token()) == this_shard_id()); +#endif + + if (!_owned_ranges_checker->belongs_to_current_node(dk.token())) { + log_trace("Token {} does not belong to this node, skipping", dk.token()); + return false; + } + return true; + }; + } public: compaction& operator=(const compaction&) = delete; compaction(const compaction&) = delete; @@ -637,6 +654,17 @@ private: // Default range sstable reader that will only return mutation that belongs to current shard. virtual flat_mutation_reader_v2 make_sstable_reader() const = 0; + // Make a filtering reader if needed + // FIXME: the sstable reader itself should be pass the owned ranges + // so it can skip over the disowned ranges efficiently using the index. + // Ref https://github.com/scylladb/scylladb/issues/12998 + flat_mutation_reader_v2 setup_sstable_reader() const { + if (!_owned_ranges_checker) { + return make_sstable_reader(); + } + return make_filtering_reader(make_sstable_reader(), make_partition_filter()); + } + virtual sstables::sstable_set make_sstable_set_for_input() const { return _table_s.get_compaction_strategy().make_sstable_set(_schema); } @@ -706,7 +734,7 @@ private: }); }); const auto& gc_state = _table_s.get_tombstone_gc_state(); - return consumer(make_compacting_reader(make_sstable_reader(), compaction_time, max_purgeable_func(), gc_state)); + return consumer(make_compacting_reader(setup_sstable_reader(), compaction_time, max_purgeable_func(), gc_state)); } future<> consume() { @@ -742,7 +770,7 @@ private: reader.consume_in_thread(std::move(cfc)); }); }); - return consumer(make_sstable_reader()); + return consumer(setup_sstable_reader()); } virtual reader_consumer_v2 make_interposer_consumer(reader_consumer_v2 end_consumer) { @@ -1178,7 +1206,6 @@ private: }; class cleanup_compaction final : public regular_compaction { - dht::incremental_owned_ranges_checker _owned_ranges_checker; private: // Called in a seastar thread dht::partition_range_vector @@ -1204,14 +1231,9 @@ protected: public: cleanup_compaction(table_state& table_s, compaction_descriptor descriptor, compaction_data& cdata) : regular_compaction(table_s, std::move(descriptor), cdata) - , _owned_ranges_checker(*_owned_ranges) { } - flat_mutation_reader_v2 make_sstable_reader() const override { - return make_filtering_reader(regular_compaction::make_sstable_reader(), make_partition_filter()); - } - std::string_view report_start_desc() const override { return "Cleaning"; } @@ -1219,21 +1241,6 @@ public: std::string_view report_finish_desc() const override { return "Cleaned"; } - - flat_mutation_reader_v2::filter make_partition_filter() const { - return [this] (const dht::decorated_key& dk) { -#ifdef SEASTAR_DEBUG - // sstables should never be shared with other shards at this point. - assert(dht::shard_of(*_schema, dk.token()) == this_shard_id()); -#endif - - if (!_owned_ranges_checker.belongs_to_current_node(dk.token())) { - log_trace("Token {} does not belong to this node, skipping", dk.token()); - return false; - } - return true; - }; - } }; class scrub_compaction final : public regular_compaction { From 7c9d16ff96dd1429ff7e2c3e8e7e50b72db7c6d3 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Fri, 30 Dec 2022 09:58:11 +0200 Subject: [PATCH 03/24] distributed_loader: reshard: reserve num_jobs buckets We know in advance how many buckets we need. We still need to emplace the first bucket upfront. Signed-off-by: Benny Halevy --- replica/distributed_loader.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index e1815fb7ac..5ce06a009e 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -221,7 +221,9 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory:: auto num_jobs = (shared_info.size() + max_sstables_per_job - 1) / max_sstables_per_job; auto sstables_per_job = shared_info.size() / num_jobs; - std::vector> buckets(1); + std::vector> buckets; + buckets.reserve(num_jobs); + buckets.emplace_back(); co_await coroutine::parallel_for_each(shared_info, [&dir, sstables_per_job, num_jobs, &buckets] (sstables::foreign_sstable_open_info& info) -> future<> { auto sst = co_await dir.load_foreign_sstable(info); // Last bucket gets leftover SSTables From c6b7fcc26f41d523d2f982940877ff8affe486a2 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Fri, 30 Dec 2022 10:13:28 +0200 Subject: [PATCH 04/24] distributed_loader: reshard: capture creator by ref Now that reshard is a coroutine, creator is preserved in the coroutine frame until completion so we can simply capture it by reference now. Note that previously it was moved into the compaction descriptor, but the capture wasn't mutable so it was copied anyhow and this change doesn't introduced a regression. Signed-off-by: Benny Halevy --- replica/distributed_loader.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 5ce06a009e..4d74c269e3 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -235,11 +235,11 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory:: // There is a semaphore inside the compaction manager in run_resharding_jobs. So we // parallel_for_each so the statistics about pending jobs are updated to reflect all // jobs. But only one will run in parallel at a time - co_await coroutine::parallel_for_each(buckets, [&dir, &table, creator = std::move(creator), iop] (std::vector& sstlist) mutable { + co_await coroutine::parallel_for_each(buckets, [&] (std::vector& sstlist) mutable { return table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshard, "Reshard compaction", [&dir, &table, creator, &sstlist, iop] (sstables::compaction_data& info) -> future<> { sstables::compaction_descriptor desc(sstlist, iop); desc.options = sstables::compaction_type_options::make_reshard(); - desc.creator = std::move(creator); + desc.creator = creator; auto result = co_await sstables::compact_sstables(std::move(desc), info, table.as_table_state()); // input sstables are moved, to guarantee their resources are released once we're done From f540af930b8583bae98c58490307684ae930e7d7 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Fri, 30 Dec 2022 09:59:03 +0200 Subject: [PATCH 05/24] distributed_loader: reshard: get a ref to table_state We don't reference the table itself, only as_table_state. Signed-off-by: Benny Halevy --- replica/distributed_loader.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 4d74c269e3..8d71bc5e30 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -235,13 +235,14 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory:: // There is a semaphore inside the compaction manager in run_resharding_jobs. So we // parallel_for_each so the statistics about pending jobs are updated to reflect all // jobs. But only one will run in parallel at a time + auto& t = table.as_table_state(); co_await coroutine::parallel_for_each(buckets, [&] (std::vector& sstlist) mutable { - return table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshard, "Reshard compaction", [&dir, &table, creator, &sstlist, iop] (sstables::compaction_data& info) -> future<> { + return table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshard, "Reshard compaction", [&] (sstables::compaction_data& info) -> future<> { sstables::compaction_descriptor desc(sstlist, iop); desc.options = sstables::compaction_type_options::make_reshard(); desc.creator = creator; - auto result = co_await sstables::compact_sstables(std::move(desc), info, table.as_table_state()); + auto result = co_await sstables::compact_sstables(std::move(desc), info, t); // input sstables are moved, to guarantee their resources are released once we're done // resharding them. co_await when_all_succeed(dir.collect_output_unshared_sstables(std::move(result.new_sstables), sstables::sstable_directory::can_be_remote::yes), dir.remove_sstables(std::move(sstlist))).discard_result(); From aa4b18f8fba86d86e958fa678e3d17ee79d465b8 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 27 Dec 2022 13:12:41 +0200 Subject: [PATCH 06/24] distributed_loader: reshard: add optional owned_ranges_ptr param For passing owned_ranges_ptr from distributed_loader::process_upload_dir. Refs #11933 Signed-off-by: Benny Halevy --- replica/distributed_loader.cc | 16 +++++++++++----- replica/distributed_loader.hh | 2 +- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 8d71bc5e30..2fc8d92189 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -207,7 +207,7 @@ distribute_reshard_jobs(sstables::sstable_directory::sstable_open_info_vector so // A creator function must be passed that will create an SSTable object in the correct shard, // and an I/O priority must be specified. future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::sstable_open_info_vector shared_info, replica::table& table, - sstables::compaction_sstable_creator_fn creator, io_priority_class iop) + sstables::compaction_sstable_creator_fn creator, io_priority_class iop, compaction::owned_ranges_ptr owned_ranges_ptr) { // Resharding doesn't like empty sstable sets, so bail early. There is nothing // to reshard in this shard. @@ -241,6 +241,7 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory:: sstables::compaction_descriptor desc(sstlist, iop); desc.options = sstables::compaction_type_options::make_reshard(); desc.creator = creator; + desc.owned_ranges = owned_ranges_ptr; auto result = co_await sstables::compact_sstables(std::move(desc), info, t); // input sstables are moved, to guarantee their resources are released once we're done @@ -252,7 +253,7 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory:: future<> run_resharding_jobs(sharded& dir, std::vector reshard_jobs, sharded& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, - io_priority_class iop) { + io_priority_class iop, compaction::owned_ranges_ptr owned_ranges_ptr) { uint64_t total_size = boost::accumulate(reshard_jobs | boost::adaptors::transformed(std::mem_fn(&reshard_shard_descriptor::size)), uint64_t(0)); if (total_size == 0) { @@ -265,7 +266,12 @@ future<> run_resharding_jobs(sharded& dir, std::vec co_await dir.invoke_on_all(coroutine::lambda([&] (sstables::sstable_directory& d) -> future<> { auto& table = db.local().find_column_family(ks_name, table_name); auto info_vec = std::move(reshard_jobs[this_shard_id()].info_vec); - co_await ::replica::reshard(d, std::move(info_vec), table, creator, iop); + // make shard-local copy of owned_ranges + compaction::owned_ranges_ptr local_owned_ranges_ptr; + if (owned_ranges_ptr) { + local_owned_ranges_ptr = make_lw_shared(*owned_ranges_ptr); + } + co_await ::replica::reshard(d, std::move(info_vec), table, creator, iop, std::move(local_owned_ranges_ptr)); co_await d.move_foreign_sstables(dir); })); @@ -279,10 +285,10 @@ future<> run_resharding_jobs(sharded& dir, std::vec // - The second part calls each shard's distributed object to reshard the SSTables they were // assigned. future<> -distributed_loader::reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, io_priority_class iop) { +distributed_loader::reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, io_priority_class iop, compaction::owned_ranges_ptr owned_ranges_ptr) { auto all_jobs = co_await collect_all_shared_sstables(dir); auto destinations = co_await distribute_reshard_jobs(std::move(all_jobs)); - co_await run_resharding_jobs(dir, std::move(destinations), db, ks_name, table_name, std::move(creator), iop); + co_await run_resharding_jobs(dir, std::move(destinations), db, ks_name, table_name, std::move(creator), iop, std::move(owned_ranges_ptr)); } future diff --git a/replica/distributed_loader.hh b/replica/distributed_loader.hh index 44fdc6c43f..942aca160c 100644 --- a/replica/distributed_loader.hh +++ b/replica/distributed_loader.hh @@ -69,7 +69,7 @@ class distributed_loader { static future<> reshape(sharded& dir, sharded& db, sstables::reshape_mode mode, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, std::function filter, io_priority_class iop); static future<> reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, - io_priority_class iop); + io_priority_class iop, compaction::owned_ranges_ptr owned_ranges_ptr = nullptr); static future<> process_sstable_dir(sharded& dir, sstables::sstable_directory::process_flags flags); static future<> lock_table(sharded& dir, sharded& db, sstring ks_name, sstring cf_name); static future make_sstables_available(sstables::sstable_directory& dir, From 3ccbb28f2adbaa18060a1e291b7040b4cdebf5ec Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 27 Dec 2022 13:12:41 +0200 Subject: [PATCH 07/24] distributed_loader: process_upload_dir: pass owned_ranges_ptr to reshard To facilitate implicit cleanup of sstables via resharding. Refs #11933 Signed-off-by: Benny Halevy --- replica/distributed_loader.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 2fc8d92189..20f8577c93 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -459,9 +459,10 @@ distributed_loader::process_upload_dir(distributed& db, distr return sstables::generation_type(gen); }; + auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(db.local().get_keyspace_local_ranges(ks)); reshard(directory, db, ks, cf, [&] (shard_id shard) mutable { return make_sstable(*global_table, upload, new_generation_for_shard(shard)); - }, service::get_local_streaming_priority()).get(); + }, service::get_local_streaming_priority(), owned_ranges_ptr).get(); reshape(directory, db, sstables::reshape_mode::strict, ks, cf, [&] (shard_id shard) { return make_sstable(*global_table, upload, new_generation_for_shard(shard)); From ecbd112979ba3dde8ec16203cc6d86a1293cedc7 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 27 Dec 2022 13:12:41 +0200 Subject: [PATCH 08/24] distributed_loader: reshard: consider sstables for cleanup When called from `process_upload_dir` we pass a list of owned tokens to `reshard`. When they are available, run resharding, with implicit cleanup, also on unshared sstables that need cleanup. Fixes #11933 Signed-off-by: Benny Halevy --- replica/distributed_loader.cc | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 20f8577c93..4959ce01e1 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -145,11 +145,11 @@ struct reshard_shard_descriptor { } }; -// Collects shared SSTables from all shards and returns a vector containing them all. +// Collects shared SSTables from all shards and sstables that require cleanup and returns a vector containing them all. // This function assumes that the list of SSTables can be fairly big so it is careful to // manipulate it in a do_for_each loop (which yields) instead of using standard accumulators. future -collect_all_shared_sstables(sharded& dir) { +collect_all_shared_sstables(sharded& dir, compaction::owned_ranges_ptr owned_ranges_ptr) { auto info_vec = sstables::sstable_directory::sstable_open_info_vector(); // We want to make sure that each distributed object reshards about the same amount of data. @@ -161,10 +161,28 @@ collect_all_shared_sstables(sharded& dir) { auto coordinator = this_shard_id(); // We will first move all of the foreign open info to temporary storage so that we can sort // them. We want to distribute bigger sstables first. - co_await dir.invoke_on_all([&info_vec, coordinator] (sstables::sstable_directory& d) -> future<> { + co_await dir.invoke_on_all([&info_vec, coordinator, owned_ranges_ptr] (sstables::sstable_directory& d) -> future<> { + auto shared_sstables = d.retrieve_shared_sstables(); + sstables::sstable_directory::sstable_open_info_vector need_cleanup; + if (owned_ranges_ptr) { + const auto& owned_ranges = *owned_ranges_ptr; + co_await d.do_for_each_sstable([&] (sstables::shared_sstable sst) -> future<> { + if (needs_cleanup(sst, owned_ranges, sst->get_schema())) { + need_cleanup.push_back(co_await sst->get_open_info()); + } + }); + } + if (shared_sstables.empty() && need_cleanup.empty()) { + co_return; + } co_await smp::submit_to(coordinator, [&] () -> future<> { - for (auto& info : d.retrieve_shared_sstables()) { - info_vec.push_back(std::move(info)); + info_vec.reserve(info_vec.size() + shared_sstables.size() + need_cleanup.size()); + for (auto& info : shared_sstables) { + info_vec.emplace_back(std::move(info)); + co_await coroutine::maybe_yield(); + } + for (auto& info : need_cleanup) { + info_vec.emplace_back(std::move(info)); co_await coroutine::maybe_yield(); } }); @@ -286,7 +304,7 @@ future<> run_resharding_jobs(sharded& dir, std::vec // assigned. future<> distributed_loader::reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, io_priority_class iop, compaction::owned_ranges_ptr owned_ranges_ptr) { - auto all_jobs = co_await collect_all_shared_sstables(dir); + auto all_jobs = co_await collect_all_shared_sstables(dir, owned_ranges_ptr); auto destinations = co_await distribute_reshard_jobs(std::move(all_jobs)); co_await run_resharding_jobs(dir, std::move(destinations), db, ks_name, table_name, std::move(creator), iop, std::move(owned_ranges_ptr)); } @@ -459,6 +477,11 @@ distributed_loader::process_upload_dir(distributed& db, distr return sstables::generation_type(gen); }; + // Pass owned_ranges_ptr to reshard to piggy-back cleanup on the resharding compaction. + // Note that needs_cleanup() is inaccurate and may return false positives, + // maybe triggerring resharding+cleanup unnecessarily for some sstables. + // But this is resharding on refresh (sstable loading via upload dir), + // which will usually require resharding anyway. auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(db.local().get_keyspace_local_ranges(ks)); reshard(directory, db, ks, cf, [&] (shard_id shard) mutable { return make_sstable(*global_table, upload, new_generation_for_shard(shard)); From ac9f8486ba3d11a5c8175a0238bbbd77d363afed Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Fri, 30 Dec 2022 15:16:38 +0200 Subject: [PATCH 09/24] compaction_manager: perform_cleanup: disallow empty sorted_owened_ranges I'm not sure why this was originally supported, maybe for upgrade sstables where we may want to rewrite the sstables without filtering any tokens, but perform_sstable_upgrade is now following a different code path and uses `rewrite_sstables` directly, without pigybacking on cleanup. Signed-off-by: Benny Halevy --- compaction/compaction_manager.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index f6ecd5eb15..fc424963a7 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1560,6 +1560,10 @@ future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_range t.schema()->ks_name(), t.schema()->cf_name())); } + if (sorted_owned_ranges->empty()) { + throw std::runtime_error("cleanup request failed: sorted_owned_ranges is empty"); + } + auto get_sstables = [this, &t, sorted_owned_ranges] () -> future> { return seastar::async([this, &t, sorted_owned_ranges = std::move(sorted_owned_ranges)] { auto schema = t.schema(); @@ -1567,7 +1571,7 @@ future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_range const auto candidates = get_candidates(t); std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&sorted_owned_ranges, schema] (const sstables::shared_sstable& sst) { seastar::thread::maybe_yield(); - return sorted_owned_ranges->empty() || needs_cleanup(sst, *sorted_owned_ranges, schema); + return needs_cleanup(sst, *sorted_owned_ranges, schema); }); return sstables; }); From 1baca96de17dd0651d81a23971855b8455d9ea59 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Fri, 20 Jan 2023 15:09:26 +0200 Subject: [PATCH 10/24] compaction_manager: needs_cleanup: delete unused schema param It isn't needed. The sstable already has a schema. Signed-off-by: Benny Halevy --- compaction/compaction_manager.cc | 8 +++----- compaction/compaction_manager.hh | 2 +- replica/distributed_loader.cc | 2 +- test/boost/sstable_compaction_test.cc | 10 +++++----- 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index fc424963a7..ce69f971ab 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1528,8 +1528,7 @@ private: } bool needs_cleanup(const sstables::shared_sstable& sst, - const dht::token_range_vector& sorted_owned_ranges, - schema_ptr s) { + const dht::token_range_vector& sorted_owned_ranges) { auto first_token = sst->get_first_decorated_key().token(); auto last_token = sst->get_last_decorated_key().token(); dht::token_range sst_token_range = dht::token_range::make(first_token, last_token); @@ -1566,12 +1565,11 @@ future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_range auto get_sstables = [this, &t, sorted_owned_ranges] () -> future> { return seastar::async([this, &t, sorted_owned_ranges = std::move(sorted_owned_ranges)] { - auto schema = t.schema(); auto sstables = std::vector{}; const auto candidates = get_candidates(t); - std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&sorted_owned_ranges, schema] (const sstables::shared_sstable& sst) { + std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&sorted_owned_ranges] (const sstables::shared_sstable& sst) { seastar::thread::maybe_yield(); - return needs_cleanup(sst, *sorted_owned_ranges, schema); + return needs_cleanup(sst, *sorted_owned_ranges); }); return sstables; }); diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index da0f39ee43..63a46714ad 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -562,7 +562,7 @@ std::ostream& operator<<(std::ostream& os, const compaction::compaction_task_exe } -bool needs_cleanup(const sstables::shared_sstable& sst, const dht::token_range_vector& owned_ranges, schema_ptr s); +bool needs_cleanup(const sstables::shared_sstable& sst, const dht::token_range_vector& owned_ranges); // Return all sstables but those that are off-strategy like the ones in maintenance set and staging dir. std::vector in_strategy_sstables(compaction::table_state& table_s); diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 4959ce01e1..1641e38257 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -167,7 +167,7 @@ collect_all_shared_sstables(sharded& dir, compactio if (owned_ranges_ptr) { const auto& owned_ranges = *owned_ranges_ptr; co_await d.do_for_each_sstable([&] (sstables::shared_sstable sst) -> future<> { - if (needs_cleanup(sst, owned_ranges, sst->get_schema())) { + if (needs_cleanup(sst, owned_ranges)) { need_cleanup.push_back(co_await sst->get_open_info()); } }); diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 92da0d4290..4dc622ee32 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -3571,23 +3571,23 @@ SEASTAR_TEST_CASE(sstable_needs_cleanup_test) { { auto local_ranges = { token_range(0, 9) }; auto sst = sst_gen(keys[0], keys[9]); - BOOST_REQUIRE(!needs_cleanup(sst, local_ranges, s)); + BOOST_REQUIRE(!needs_cleanup(sst, local_ranges)); } { auto local_ranges = { token_range(0, 1), token_range(3, 4), token_range(5, 6) }; auto sst = sst_gen(keys[0], keys[1]); - BOOST_REQUIRE(!needs_cleanup(sst, local_ranges, s)); + BOOST_REQUIRE(!needs_cleanup(sst, local_ranges)); auto sst2 = sst_gen(keys[2], keys[2]); - BOOST_REQUIRE(needs_cleanup(sst2, local_ranges, s)); + BOOST_REQUIRE(needs_cleanup(sst2, local_ranges)); auto sst3 = sst_gen(keys[0], keys[6]); - BOOST_REQUIRE(needs_cleanup(sst3, local_ranges, s)); + BOOST_REQUIRE(needs_cleanup(sst3, local_ranges)); auto sst5 = sst_gen(keys[7], keys[7]); - BOOST_REQUIRE(needs_cleanup(sst5, local_ranges, s)); + BOOST_REQUIRE(needs_cleanup(sst5, local_ranges)); } }); } From d0690b64c16727f8b94730b2e00e3ce8921b9254 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Thu, 29 Dec 2022 14:10:48 +0200 Subject: [PATCH 11/24] table, compaction_manager: add update_sstable_cleanup_state update_sstable_cleanup_state calls needs_cleanup and inserts (or erases) the sstable into the respective compaction_state.sstables_requiring_cleanup set. Signed-off-by: Benny Halevy --- compaction/compaction_manager.cc | 15 +++++++++++++-- compaction/compaction_manager.hh | 6 ++++++ replica/database.hh | 3 +++ replica/distributed_loader.cc | 15 ++++++++++----- replica/table.cc | 6 ++++++ 5 files changed, 38 insertions(+), 7 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index ce69f971ab..ef56590fe3 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1548,6 +1548,17 @@ bool needs_cleanup(const sstables::shared_sstable& sst, return true; } +bool compaction_manager::update_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst, owned_ranges_ptr owned_ranges_ptr) { + auto& cs = get_compaction_state(&t); + if (owned_ranges_ptr && needs_cleanup(sst, *owned_ranges_ptr)) { + cs.sstables_requiring_cleanup.insert(sst); + return true; + } else { + cs.sstables_requiring_cleanup.erase(sst); + return false; + } +} + future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_ranges, table_state& t) { auto check_for_cleanup = [this, &t] { return boost::algorithm::any_of(_tasks, [&t] (auto& task) { @@ -1567,9 +1578,9 @@ future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_range return seastar::async([this, &t, sorted_owned_ranges = std::move(sorted_owned_ranges)] { auto sstables = std::vector{}; const auto candidates = get_candidates(t); - std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&sorted_owned_ranges] (const sstables::shared_sstable& sst) { + std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&] (const sstables::shared_sstable& sst) { seastar::thread::maybe_yield(); - return needs_cleanup(sst, *sorted_owned_ranges); + return update_sstable_cleanup_state(t, sst, sorted_owned_ranges); }); return sstables; }); diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 63a46714ad..f90795a6ec 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -22,6 +22,7 @@ #include #include #include "log.hh" +#include "sstables/shared_sstable.hh" #include "utils/exponential_backoff_retry.hh" #include "utils/updateable_value.hh" #include "utils/serialized_action.hh" @@ -100,6 +101,8 @@ private: compaction_backlog_tracker backlog_tracker; + std::unordered_set sstables_requiring_cleanup; + explicit compaction_state(table_state& t); compaction_state(compaction_state&&) = delete; ~compaction_state(); @@ -420,6 +423,9 @@ public: return _tombstone_gc_state; }; + // Add sst to or remove it from the respective compaction_state.sstables_requiring_cleanup set. + bool update_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst, owned_ranges_ptr owned_ranges_ptr); + friend class compacting_sstable_registration; friend class compaction_weight_registration; friend class compaction_manager_test; diff --git a/replica/database.hh b/replica/database.hh index 390867d805..ad8def5d3e 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1125,6 +1125,9 @@ public: // Safely iterate through table states, while performing async operations on them. future<> parallel_foreach_table_state(std::function(compaction::table_state&)> action); + // Add sst to or remove it from the sstables_requiring_cleanup set. + bool update_sstable_cleanup_state(const sstables::shared_sstable& sst, compaction::owned_ranges_ptr owned_ranges_ptr); + friend class compaction_group; }; diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 1641e38257..d58e65a02d 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -149,7 +149,7 @@ struct reshard_shard_descriptor { // This function assumes that the list of SSTables can be fairly big so it is careful to // manipulate it in a do_for_each loop (which yields) instead of using standard accumulators. future -collect_all_shared_sstables(sharded& dir, compaction::owned_ranges_ptr owned_ranges_ptr) { +collect_all_shared_sstables(sharded& dir, sharded& db, sstring ks_name, sstring table_name, compaction::owned_ranges_ptr owned_ranges_ptr) { auto info_vec = sstables::sstable_directory::sstable_open_info_vector(); // We want to make sure that each distributed object reshards about the same amount of data. @@ -161,13 +161,13 @@ collect_all_shared_sstables(sharded& dir, compactio auto coordinator = this_shard_id(); // We will first move all of the foreign open info to temporary storage so that we can sort // them. We want to distribute bigger sstables first. - co_await dir.invoke_on_all([&info_vec, coordinator, owned_ranges_ptr] (sstables::sstable_directory& d) -> future<> { + co_await dir.invoke_on_all([&] (sstables::sstable_directory& d) -> future<> { auto shared_sstables = d.retrieve_shared_sstables(); sstables::sstable_directory::sstable_open_info_vector need_cleanup; if (owned_ranges_ptr) { - const auto& owned_ranges = *owned_ranges_ptr; + auto& table = db.local().find_column_family(ks_name, table_name); co_await d.do_for_each_sstable([&] (sstables::shared_sstable sst) -> future<> { - if (needs_cleanup(sst, owned_ranges)) { + if (table.update_sstable_cleanup_state(sst, owned_ranges_ptr)) { need_cleanup.push_back(co_await sst->get_open_info()); } }); @@ -304,7 +304,7 @@ future<> run_resharding_jobs(sharded& dir, std::vec // assigned. future<> distributed_loader::reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, io_priority_class iop, compaction::owned_ranges_ptr owned_ranges_ptr) { - auto all_jobs = co_await collect_all_shared_sstables(dir, owned_ranges_ptr); + auto all_jobs = co_await collect_all_shared_sstables(dir, db, ks_name, table_name, owned_ranges_ptr); auto destinations = co_await distribute_reshard_jobs(std::move(all_jobs)); co_await run_resharding_jobs(dir, std::move(destinations), db, ks_name, table_name, std::move(creator), iop, std::move(owned_ranges_ptr)); } @@ -482,6 +482,11 @@ distributed_loader::process_upload_dir(distributed& db, distr // maybe triggerring resharding+cleanup unnecessarily for some sstables. // But this is resharding on refresh (sstable loading via upload dir), // which will usually require resharding anyway. + // + // FIXME: take multiple compaction groups into account + // - segregate resharded tables into compaction groups + // - split the keyspace local ranges per compaction_group as done in table::perform_cleanup_compaction + // so that cleanup can be considered per compaction group auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(db.local().get_keyspace_local_ranges(ks)); reshard(directory, db, ks, cf, [&] (shard_id shard) mutable { return make_sstable(*global_table, upload, new_generation_for_shard(shard)); diff --git a/replica/table.cc b/replica/table.cc index 3878e0b467..92b3cb385c 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -2835,4 +2835,10 @@ table::as_data_dictionary() const { return _impl.wrap(*this); } +bool table::update_sstable_cleanup_state(const sstables::shared_sstable& sst, compaction::owned_ranges_ptr owned_ranges_ptr) { + // FIXME: it's possible that the sstable belongs to multiple compaction_groups + auto& cg = compaction_group_for_sstable(sst); + return get_compaction_manager().update_sstable_cleanup_state(cg.as_table_state(), sst, std::move(owned_ranges_ptr)); +} + } // namespace replica From db7fa9f3be1bb3deb8696ed0cb17ea308ca7a4af Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sat, 31 Dec 2022 10:00:12 +0200 Subject: [PATCH 12/24] distributed_loader: reshard: update sstable cleanup state Since the sstables are loaded from foreign open info we should mark them for cleanup if needed (and owned_ranges_ptr is provided). This will allow a later patch to enable filtering for cleanup only for sstable sets containing sstables that require cleanup. Signed-off-by: Benny Halevy --- replica/distributed_loader.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index d58e65a02d..0b1a44d14d 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -242,8 +242,11 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory:: std::vector> buckets; buckets.reserve(num_jobs); buckets.emplace_back(); - co_await coroutine::parallel_for_each(shared_info, [&dir, sstables_per_job, num_jobs, &buckets] (sstables::foreign_sstable_open_info& info) -> future<> { + co_await coroutine::parallel_for_each(shared_info, [&] (sstables::foreign_sstable_open_info& info) -> future<> { auto sst = co_await dir.load_foreign_sstable(info); + if (owned_ranges_ptr) { + table.update_sstable_cleanup_state(sst, owned_ranges_ptr); + } // Last bucket gets leftover SSTables if ((buckets.back().size() >= sstables_per_job) && (buckets.size() < num_jobs)) { buckets.emplace_back(); From d76568649125195fa54e5ef9b1fe9d3df4621519 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sat, 31 Dec 2022 10:11:41 +0200 Subject: [PATCH 13/24] sstable_set: add for_each_sstable_until Calls a function on all sstables or until the function returns stop_iteration::yes. Change the sstable_set_impl interface to expose only for_each_sstable_until and let sstable_set::for_each_sstable use that, wrapping the void-returning function passed to it. Signed-off-by: Benny Halevy --- sstables/sstable_set.cc | 34 ++++++++++++++++++++++++---------- sstables/sstable_set.hh | 3 +++ sstables/sstable_set_impl.hh | 8 ++++---- 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/sstables/sstable_set.cc b/sstables/sstable_set.cc index 93349dfe6e..c21bbcb2cc 100644 --- a/sstables/sstable_set.cc +++ b/sstables/sstable_set.cc @@ -162,7 +162,14 @@ sstable_set::all() const { } void sstable_set::for_each_sstable(std::function func) const { - return _impl->for_each_sstable(std::move(func)); + _impl->for_each_sstable_until([func = std::move(func)] (const shared_sstable& sst) { + func(sst); + return stop_iteration::no; + }); +} + +stop_iteration sstable_set::for_each_sstable_until(std::function func) const { + return _impl->for_each_sstable_until(std::move(func)); } void @@ -315,10 +322,13 @@ lw_shared_ptr partitioned_sstable_set::all() const { return _all; } -void partitioned_sstable_set::for_each_sstable(std::function func) const { +stop_iteration partitioned_sstable_set::for_each_sstable_until(std::function func) const { for (auto& sst : *_all) { - func(sst); + if (func(sst)) { + return stop_iteration::yes; + } } + return stop_iteration::no; } void partitioned_sstable_set::insert(shared_sstable sst) { @@ -451,10 +461,13 @@ time_series_sstable_set::size() const noexcept { return _sstables->size(); } -void time_series_sstable_set::for_each_sstable(std::function func) const { +stop_iteration time_series_sstable_set::for_each_sstable_until(std::function func) const { for (auto& entry : *_sstables) { - func(entry.second); + if (func(entry.second)) { + return stop_iteration::yes; + } } + return stop_iteration::no; } // O(log n) @@ -1022,12 +1035,13 @@ lw_shared_ptr compound_sstable_set::all() const { return ret; } -void compound_sstable_set::for_each_sstable(std::function func) const { +stop_iteration compound_sstable_set::for_each_sstable_until(std::function func) const { for (auto& set : _sets) { - set->for_each_sstable([&func] (const shared_sstable& sst) { - func(sst); - }); + if (set->for_each_sstable_until([&func] (const shared_sstable& sst) { return func(sst); })) { + return stop_iteration::yes; + } } + return stop_iteration::no; } void compound_sstable_set::insert(shared_sstable sst) { @@ -1219,7 +1233,7 @@ flat_mutation_reader_v2 sstable_set::make_crawling_reader( tracing::trace_state_ptr trace_ptr, read_monitor_generator& monitor_generator) const { std::vector readers; - _impl->for_each_sstable([&] (const shared_sstable& sst) mutable { + for_each_sstable([&] (const shared_sstable& sst) mutable { readers.emplace_back(sst->make_crawling_reader(schema, permit, pc, trace_ptr, monitor_generator(sst))); }); return make_combined_reader(schema, std::move(permit), std::move(readers), streamed_mutation::forwarding::no, mutation_reader::forwarding::no); diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index 7dc4d9f486..067f53c657 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -66,6 +66,9 @@ public: lw_shared_ptr all() const; // Prefer for_each_sstable() over all() for iteration purposes, as the latter may have to copy all sstables into a temporary void for_each_sstable(std::function func) const; + // Calls func for each sstable or until it returns stop_iteration::yes + // Returns the last stop_iteration value. + stop_iteration for_each_sstable_until(std::function func) const; void insert(shared_sstable sst); void erase(shared_sstable sst); size_t size() const noexcept; diff --git a/sstables/sstable_set_impl.hh b/sstables/sstable_set_impl.hh index 17c61509d3..3ee17ef390 100644 --- a/sstables/sstable_set_impl.hh +++ b/sstables/sstable_set_impl.hh @@ -30,7 +30,7 @@ public: virtual std::vector select(const dht::partition_range& range) const = 0; virtual std::vector select_sstable_runs(const std::vector& sstables) const; virtual lw_shared_ptr all() const = 0; - virtual void for_each_sstable(std::function func) const = 0; + virtual stop_iteration for_each_sstable_until(std::function func) const = 0; virtual void insert(shared_sstable sst) = 0; virtual void erase(shared_sstable sst) = 0; virtual size_t size() const noexcept = 0; @@ -95,7 +95,7 @@ public: virtual std::vector select(const dht::partition_range& range) const override; virtual std::vector select_sstable_runs(const std::vector& sstables) const override; virtual lw_shared_ptr all() const override; - virtual void for_each_sstable(std::function func) const override; + virtual stop_iteration for_each_sstable_until(std::function func) const override; virtual void insert(shared_sstable sst) override; virtual void erase(shared_sstable sst) override; virtual size_t size() const noexcept override; @@ -121,7 +121,7 @@ public: virtual std::unique_ptr clone() const override; virtual std::vector select(const dht::partition_range& range = query::full_partition_range) const override; virtual lw_shared_ptr all() const override; - virtual void for_each_sstable(std::function func) const override; + virtual stop_iteration for_each_sstable_until(std::function func) const override; virtual void insert(shared_sstable sst) override; virtual void erase(shared_sstable sst) override; virtual size_t size() const noexcept override; @@ -161,7 +161,7 @@ public: virtual std::vector select(const dht::partition_range& range = query::full_partition_range) const override; virtual std::vector select_sstable_runs(const std::vector& sstables) const override; virtual lw_shared_ptr all() const override; - virtual void for_each_sstable(std::function func) const override; + virtual stop_iteration for_each_sstable_until(std::function func) const override; virtual void insert(shared_sstable sst) override; virtual void erase(shared_sstable sst) override; virtual size_t size() const noexcept override; From 6ebafe74b9b38b2e48ccbed99123ccf8ee57882f Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Thu, 29 Dec 2022 14:18:41 +0200 Subject: [PATCH 14/24] table, compaction_manager: add requires_cleanup Returns true iff any of the sstables in the set requries cleanup. Signed-off-by: Benny Halevy --- compaction/compaction_manager.cc | 5 +++++ compaction/compaction_manager.hh | 6 ++++++ replica/database.hh | 8 +++++++- replica/table.cc | 14 +++++++++++++- 4 files changed, 31 insertions(+), 2 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index ef56590fe3..a9a828c2e7 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1559,6 +1559,11 @@ bool compaction_manager::update_sstable_cleanup_state(table_state& t, const ssta } } +bool compaction_manager::requires_cleanup(table_state& t, const sstables::shared_sstable& sst) const { + const auto& cs = get_compaction_state(&t); + return cs.sstables_requiring_cleanup.contains(sst); +} + future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_ranges, table_state& t) { auto check_for_cleanup = [this, &t] { return boost::algorithm::any_of(_tasks, [&t] (auto& task) { diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index f90795a6ec..249ff387bd 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -219,6 +219,9 @@ private: // gets the table's compaction state // throws std::out_of_range exception if not found. compaction_state& get_compaction_state(compaction::table_state* t); + const compaction_state& get_compaction_state(compaction::table_state* t) const { + return const_cast(this)->get_compaction_state(t); + } // Return true if compaction manager is enabled and // table still exists and compaction is not disabled for the table. @@ -426,6 +429,9 @@ public: // Add sst to or remove it from the respective compaction_state.sstables_requiring_cleanup set. bool update_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst, owned_ranges_ptr owned_ranges_ptr); + // checks if the sstable is in the respective compaction_state.sstables_requiring_cleanup set. + bool requires_cleanup(table_state& t, const sstables::shared_sstable& sst) const; + friend class compacting_sstable_registration; friend class compaction_weight_registration; friend class compaction_manager_test; diff --git a/replica/database.hh b/replica/database.hh index ad8def5d3e..1772b690d6 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -549,7 +549,7 @@ private: // Select a compaction group from a given key. compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const noexcept; // Select a compaction group from a given sstable based on its token range. - compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) noexcept; + compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const noexcept; // Returns a list of all compaction groups. const std::vector>& compaction_groups() const noexcept; // Safely iterate through compaction groups, while performing async operations on them. @@ -1128,6 +1128,12 @@ public: // Add sst to or remove it from the sstables_requiring_cleanup set. bool update_sstable_cleanup_state(const sstables::shared_sstable& sst, compaction::owned_ranges_ptr owned_ranges_ptr); + // Returns true if the sstable requries cleanup. + bool requires_cleanup(const sstables::shared_sstable& sst) const; + + // Returns true if any of the sstables requries cleanup. + bool requires_cleanup(const sstables::sstable_set& set) const; + friend class compaction_group; }; diff --git a/replica/table.cc b/replica/table.cc index 92b3cb385c..8678f75c08 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -557,7 +557,7 @@ compaction_group& table::compaction_group_for_key(partition_key_view key, const return compaction_group_for_token(dht::get_token(*s, key)); } -compaction_group& table::compaction_group_for_sstable(const sstables::shared_sstable& sst) noexcept { +compaction_group& table::compaction_group_for_sstable(const sstables::shared_sstable& sst) const noexcept { // FIXME: a sstable can belong to more than one group, change interface to reflect that. return compaction_group_for_token(sst->get_first_decorated_key().token()); } @@ -2841,4 +2841,16 @@ bool table::update_sstable_cleanup_state(const sstables::shared_sstable& sst, co return get_compaction_manager().update_sstable_cleanup_state(cg.as_table_state(), sst, std::move(owned_ranges_ptr)); } +bool table::requires_cleanup(const sstables::shared_sstable& sst) const { + auto& cg = compaction_group_for_sstable(sst); + return get_compaction_manager().requires_cleanup(cg.as_table_state(), sst); +} + +bool table::requires_cleanup(const sstables::sstable_set& set) const { + return bool(set.for_each_sstable_until([this] (const sstables::shared_sstable &sst) { + auto& cg = compaction_group_for_sstable(sst); + return stop_iteration(_compaction_manager.requires_cleanup(cg.as_table_state(), sst)); + })); +} + } // namespace replica From bbfe839a739085fbb19ddbab24a73ef28476c481 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Fri, 30 Dec 2022 16:25:02 +0200 Subject: [PATCH 15/24] compaction_manager: get_candidates: mark as const Signed-off-by: Benny Halevy --- compaction/compaction_manager.cc | 2 +- compaction/compaction_manager.hh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index a9a828c2e7..652156aeb9 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -205,7 +205,7 @@ std::vector in_strategy_sstables(table_state& table_s) })); } -std::vector compaction_manager::get_candidates(table_state& t) { +std::vector compaction_manager::get_candidates(table_state& t) const { std::vector candidates; candidates.reserve(t.main_sstable_set().size()); // prevents sstables that belongs to a partial run being generated by ongoing compaction from being diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 249ff387bd..637957f880 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -206,7 +206,7 @@ private: void deregister_weight(int weight); // Get candidates for compaction strategy, which are all sstables but the ones being compacted. - std::vector get_candidates(compaction::table_state& t); + std::vector get_candidates(compaction::table_state& t) const; template requires std::same_as || std::sentinel_for From cac60a09acaf7d5a8f2d7612c359830e34bc03e9 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Fri, 30 Dec 2022 16:18:14 +0200 Subject: [PATCH 16/24] compaction_manager: refactor get_candidates Allow getting candidates for compaction from an arbitrary range of sstable, not only the in_strategy_sstables. To be used by perform_cleanup to mark all sstables that require cleanup, even if they can't be compacted at this time. Signed-off-by: Benny Halevy --- compaction/compaction_manager.cc | 13 +++++++++++-- compaction/compaction_manager.hh | 4 ++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 652156aeb9..67455fbf72 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -206,8 +206,14 @@ std::vector in_strategy_sstables(table_state& table_s) } std::vector compaction_manager::get_candidates(table_state& t) const { + return get_candidates(t, *t.main_sstable_set().all()); +} + +template +requires std::convertible_to, sstables::shared_sstable> +std::vector compaction_manager::get_candidates(table_state& t, const Range& sstables) const { std::vector candidates; - candidates.reserve(t.main_sstable_set().size()); + candidates.reserve(sstables.size()); // prevents sstables that belongs to a partial run being generated by ongoing compaction from being // selected for compaction, which could potentially result in wrong behavior. auto partial_run_identifiers = boost::copy_range>(_tasks @@ -215,7 +221,10 @@ std::vector compaction_manager::get_candidates(table_s | boost::adaptors::transformed(std::mem_fn(&compaction_task_executor::output_run_id))); // Filter out sstables that are being compacted. - for (auto& sst : in_strategy_sstables(t)) { + for (const auto& sst : sstables) { + if (!sstables::is_eligible_for_compaction(sst)) { + continue; + } if (_compacting_sstables.contains(sst)) { continue; } diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 637957f880..0c7383050c 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -208,6 +208,10 @@ private: // Get candidates for compaction strategy, which are all sstables but the ones being compacted. std::vector get_candidates(compaction::table_state& t) const; + template + requires std::convertible_to, sstables::shared_sstable> + std::vector get_candidates(table_state& t, const Range& sstables) const; + template requires std::same_as || std::sentinel_for void register_compacting_sstables(Iterator first, Sentinel last); From 690697961c73823436eebfdc4e3d7e8d7d60312e Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Thu, 29 Dec 2022 15:52:03 +0200 Subject: [PATCH 17/24] compaction_manager: compacting_sstable_registration: keep a ref to the compaction_state To be used for managing sstables requiring cleanup. Signed-off-by: Benny Halevy --- compaction/compaction_manager.cc | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 67455fbf72..fc33878ade 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -32,14 +32,16 @@ using namespace compaction; class compacting_sstable_registration { compaction_manager& _cm; + compaction_manager::compaction_state& _cs; std::unordered_set _compacting; public: - explicit compacting_sstable_registration(compaction_manager& cm) noexcept + explicit compacting_sstable_registration(compaction_manager& cm, compaction_manager::compaction_state& cs) noexcept : _cm(cm) + , _cs(cs) { } - compacting_sstable_registration(compaction_manager& cm, std::vector compacting) - : compacting_sstable_registration(cm) + compacting_sstable_registration(compaction_manager& cm, compaction_manager::compaction_state& cs, std::vector compacting) + : compacting_sstable_registration(cm, cs) { register_compacting(compacting); } @@ -57,6 +59,7 @@ public: compacting_sstable_registration(compacting_sstable_registration&& other) noexcept : _cm(other._cm) + , _cs(other._cs) , _compacting(std::move(other._compacting)) { } @@ -422,7 +425,7 @@ protected: table_state* t = _compacting_table; sstables::compaction_strategy cs = t->get_compaction_strategy(); sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(*t, _cm.get_candidates(*t)); - auto compacting = compacting_sstable_registration(_cm, descriptor.sstables); + auto compacting = compacting_sstable_registration(_cm, _cm.get_compaction_state(t), descriptor.sstables); auto release_exhausted = [&compacting] (const std::vector& exhausted_sstables) { compacting.release_compacting(exhausted_sstables); }; @@ -1020,7 +1023,7 @@ protected: _cm.postpone_compaction_for_table(&t); co_return std::nullopt; } - auto compacting = compacting_sstable_registration(_cm, descriptor.sstables); + auto compacting = compacting_sstable_registration(_cm, _cm.get_compaction_state(&t), descriptor.sstables); auto weight_r = compaction_weight_registration(&_cm, weight); auto release_exhausted = [&compacting] (const std::vector& exhausted_sstables) { compacting.release_compacting(exhausted_sstables); @@ -1364,7 +1367,7 @@ future compaction_manager::perform_tas // in the re-write, we need to barrier out any previously running // compaction. std::vector sstables; - compacting_sstable_registration compacting(*this); + compacting_sstable_registration compacting(*this, get_compaction_state(&t)); co_await run_with_compaction_disabled(t, [&sstables, &compacting, get_func = std::move(get_func)] () -> future<> { // Getting sstables and registering them as compacting must be atomic, to avoid a race condition where // regular compaction runs in between and picks the same files. From 73280c0a15b9160ae175f2a17ed99ed4ec6afc90 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Thu, 19 Jan 2023 15:23:56 +0200 Subject: [PATCH 18/24] compaction: refactor compaction_fwd.hh out of compaction_descriptor.hh So it can be used in the next patch that will refactor compaction_state out of class compaction_manager. Signed-off-by: Benny Halevy --- compaction/compaction_descriptor.hh | 11 +---------- compaction/compaction_fwd.hh | 25 +++++++++++++++++++++++++ compaction/compaction_strategy_impl.hh | 5 ----- compaction/strategy_control.hh | 4 ++-- replica/compaction_group.hh | 5 +---- replica/database.hh | 5 +---- 6 files changed, 30 insertions(+), 25 deletions(-) create mode 100644 compaction/compaction_fwd.hh diff --git a/compaction/compaction_descriptor.hh b/compaction/compaction_descriptor.hh index ec1361f726..449b581ce7 100644 --- a/compaction/compaction_descriptor.hh +++ b/compaction/compaction_descriptor.hh @@ -19,16 +19,7 @@ #include "utils/UUID.hh" #include "dht/i_partitioner.hh" #include "compaction_weight_registration.hh" - -namespace compaction { - -using owned_ranges_ptr = lw_shared_ptr; - -inline owned_ranges_ptr make_owned_ranges_ptr(dht::token_range_vector&& ranges) { - return make_lw_shared(std::move(ranges)); -} - -} // namespace compaction +#include "compaction_fwd.hh" namespace sstables { diff --git a/compaction/compaction_fwd.hh b/compaction/compaction_fwd.hh new file mode 100644 index 0000000000..eb85dc0d1f --- /dev/null +++ b/compaction/compaction_fwd.hh @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2023-present ScyllaDB + * + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include "dht/i_partitioner.hh" + +namespace compaction { + +class table_state; +class strategy_control; + +using owned_ranges_ptr = lw_shared_ptr; + +inline owned_ranges_ptr make_owned_ranges_ptr(dht::token_range_vector&& ranges) { + return make_lw_shared(std::move(ranges)); +} + +} // namespace compaction diff --git a/compaction/compaction_strategy_impl.hh b/compaction/compaction_strategy_impl.hh index afea050cb2..6342b6f203 100644 --- a/compaction/compaction_strategy_impl.hh +++ b/compaction/compaction_strategy_impl.hh @@ -15,11 +15,6 @@ #include "compaction_descriptor.hh" #include "tombstone_gc.hh" -namespace compaction { -class table_state; -class strategy_control; -} - namespace sstables { class sstable_set_impl; diff --git a/compaction/strategy_control.hh b/compaction/strategy_control.hh index 7e4b3f9a4d..bf7231198e 100644 --- a/compaction/strategy_control.hh +++ b/compaction/strategy_control.hh @@ -9,9 +9,9 @@ #pragma once -namespace compaction { +#include "compaction/compaction_fwd.hh" -class table_state; +namespace compaction { // Used by manager to set goals and constraints on compaction strategies class strategy_control { diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index e9e4f5a6b1..309327019b 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -11,13 +11,10 @@ #include "compaction/compaction_backlog_manager.hh" #include "compaction/compaction_strategy_state.hh" #include "sstables/sstable_set.hh" +#include "compaction/compaction_fwd.hh" #pragma once -namespace compaction { -class table_state; -} - namespace replica { using enable_backlog_tracker = bool_class; diff --git a/replica/database.hh b/replica/database.hh index 1772b690d6..714b2eb10b 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -63,6 +63,7 @@ #include "db/rate_limiter.hh" #include "db/operation_type.hh" #include "utils/serialized_action.hh" +#include "compaction/compaction_fwd.hh" #include "compaction/compaction_manager.hh" #include "utils/disk-error-handler.hh" #include "rust/wasmtime_bindings.hh" @@ -98,10 +99,6 @@ class directory_semaphore; } -namespace compaction { -class table_state; -} - namespace ser { template class serializer; From b3192b9f165e5062d04d5eafc928126d10b6ecb2 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Thu, 19 Jan 2023 15:45:01 +0200 Subject: [PATCH 19/24] compaction: refactor compaction_state out of compaction_manager To use it both from compaction_manager and compaction_descriptor in a following patch. Signed-off-by: Benny Halevy --- compaction/compaction_fwd.hh | 1 + compaction/compaction_manager.cc | 12 ++++---- compaction/compaction_manager.hh | 34 +++------------------- compaction/compaction_state.hh | 48 ++++++++++++++++++++++++++++++++ 4 files changed, 59 insertions(+), 36 deletions(-) create mode 100644 compaction/compaction_state.hh diff --git a/compaction/compaction_fwd.hh b/compaction/compaction_fwd.hh index eb85dc0d1f..bd8b118134 100644 --- a/compaction/compaction_fwd.hh +++ b/compaction/compaction_fwd.hh @@ -15,6 +15,7 @@ namespace compaction { class table_state; class strategy_control; +struct compaction_state; using owned_ranges_ptr = lw_shared_ptr; diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index fc33878ade..5bf707e4f1 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -32,15 +32,15 @@ using namespace compaction; class compacting_sstable_registration { compaction_manager& _cm; - compaction_manager::compaction_state& _cs; + compaction::compaction_state& _cs; std::unordered_set _compacting; public: - explicit compacting_sstable_registration(compaction_manager& cm, compaction_manager::compaction_state& cs) noexcept + explicit compacting_sstable_registration(compaction_manager& cm, compaction::compaction_state& cs) noexcept : _cm(cm) , _cs(cs) { } - compacting_sstable_registration(compaction_manager& cm, compaction_manager::compaction_state& cs, std::vector compacting) + compacting_sstable_registration(compaction_manager& cm, compaction::compaction_state& cs, std::vector compacting) : compacting_sstable_registration(cm, cs) { register_compacting(compacting); @@ -273,7 +273,7 @@ private: virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override {} }; -compaction_manager::compaction_state& compaction_manager::get_compaction_state(table_state* t) { +compaction::compaction_state& compaction_manager::get_compaction_state(table_state* t) { try { return _compaction_state.at(t); } catch (std::out_of_range&) { @@ -580,7 +580,7 @@ inline compaction_controller make_compaction_controller(const compaction_manager return compaction_controller(csg, static_shares, 250ms, std::move(fn)); } -compaction_manager::compaction_state::~compaction_state() { +compaction::compaction_state::~compaction_state() { compaction_done.broken(); } @@ -1663,7 +1663,7 @@ future compaction_manager::perform_sst }, can_purge_tombstones::no); } -compaction_manager::compaction_state::compaction_state(table_state& t) +compaction::compaction_state::compaction_state(table_state& t) : backlog_tracker(t.get_compaction_strategy().make_backlog_tracker()) { } diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 0c7383050c..9faaf10cf0 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -34,6 +34,7 @@ #include "compaction_weight_registration.hh" #include "compaction_backlog_manager.hh" #include "compaction/task_manager_module.hh" +#include "compaction_state.hh" #include "strategy_control.hh" #include "backlog_controller.hh" #include "seastarx.hh" @@ -84,33 +85,6 @@ public: utils::updateable_value static_shares = utils::updateable_value(0); utils::updateable_value throughput_mb_per_sec = utils::updateable_value(0); }; -private: - struct compaction_state { - // Used both by compaction tasks that refer to the compaction_state - // and by any function running under run_with_compaction_disabled(). - seastar::gate gate; - - // Prevents table from running major and minor compaction at the same time. - rwlock lock; - - // Raised by any function running under run_with_compaction_disabled(); - long compaction_disabled_counter = 0; - - // Signaled whenever a compaction task completes. - condition_variable compaction_done; - - compaction_backlog_tracker backlog_tracker; - - std::unordered_set sstables_requiring_cleanup; - - explicit compaction_state(table_state& t); - compaction_state(compaction_state&&) = delete; - ~compaction_state(); - - bool compaction_disabled() const noexcept { - return compaction_disabled_counter > 0; - } - }; public: class can_purge_tombstones_tag; @@ -353,7 +327,7 @@ public: class compaction_reenabler { compaction_manager& _cm; compaction::table_state* _table; - compaction_manager::compaction_state& _compaction_state; + compaction::compaction_state& _compaction_state; gate::holder _holder; public: @@ -366,7 +340,7 @@ public: return _table; } - const compaction_manager::compaction_state& compaction_state() const noexcept { + const compaction::compaction_state& compaction_state() const noexcept { return _compaction_state; } }; @@ -473,7 +447,7 @@ public: protected: compaction_manager& _cm; ::compaction::table_state* _compacting_table = nullptr; - compaction_manager::compaction_state& _compaction_state; + compaction::compaction_state& _compaction_state; sstables::compaction_data _compaction_data; state _state = state::none; diff --git a/compaction/compaction_state.hh b/compaction/compaction_state.hh new file mode 100644 index 0000000000..d235032c01 --- /dev/null +++ b/compaction/compaction_state.hh @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include +#include +#include +#include "seastarx.hh" + +#include "compaction/compaction_fwd.hh" +#include "compaction/compaction_backlog_manager.hh" + +namespace compaction { + +struct compaction_state { + // Used both by compaction tasks that refer to the compaction_state + // and by any function running under run_with_compaction_disabled(). + seastar::gate gate; + + // Prevents table from running major and minor compaction at the same time. + seastar::rwlock lock; + + // Raised by any function running under run_with_compaction_disabled(); + long compaction_disabled_counter = 0; + + // Signaled whenever a compaction task completes. + condition_variable compaction_done; + + compaction_backlog_tracker backlog_tracker; + + std::unordered_set sstables_requiring_cleanup; + + explicit compaction_state(table_state& t); + compaction_state(compaction_state&&) = delete; + ~compaction_state(); + + bool compaction_disabled() const noexcept { + return compaction_disabled_counter > 0; + } +}; + +} // namespace compaction_manager From c2bf0e0b72fdfdf92d8552a276a26f6a873ef729 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 28 Dec 2022 16:06:46 +0200 Subject: [PATCH 20/24] compaction_manager: perform_cleanup: keep sstables in compaction_state::sstables_requiring_cleanup As a first step towards parallel cleanup by (regular) compaction and cleanup compaction, filter all sstables in perform_cleanup and keep the set of sstables in the compaction_state. Erase from that set when the sstables are unregistered from compaction. Signed-off-by: Benny Halevy --- compaction/compaction_manager.cc | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 5bf707e4f1..ee85bc932b 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -80,6 +80,7 @@ public: _cm.deregister_compacting_sstables(sstables.begin(), sstables.end()); for (const auto& sst : sstables) { _compacting.erase(sst); + _cs.sstables_requiring_cleanup.erase(sst); } } }; @@ -1591,15 +1592,22 @@ future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_range throw std::runtime_error("cleanup request failed: sorted_owned_ranges is empty"); } + // Called with compaction_disabled auto get_sstables = [this, &t, sorted_owned_ranges] () -> future> { return seastar::async([this, &t, sorted_owned_ranges = std::move(sorted_owned_ranges)] { - auto sstables = std::vector{}; - const auto candidates = get_candidates(t); - std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&] (const sstables::shared_sstable& sst) { - seastar::thread::maybe_yield(); - return update_sstable_cleanup_state(t, sst, sorted_owned_ranges); - }); - return sstables; + auto update_sstables_cleanup_state = [&] (const sstables::sstable_set& set) { + set.for_each_sstable([&] (const sstables::shared_sstable& sst) { + update_sstable_cleanup_state(t, sst, sorted_owned_ranges); + seastar::thread::maybe_yield(); + }); + }; + update_sstables_cleanup_state(t.main_sstable_set()); + update_sstables_cleanup_state(t.maintenance_sstable_set()); + // Some sstables may remain in sstables_requiring_cleanup + // for later processing if they can't be cleaned up right now. + // They are erased from sstables_requiring_cleanup by compacting.release_compacting + auto& cs = get_compaction_state(&t); + return get_candidates(t, cs.sstables_requiring_cleanup); }); }; From d87925d9fc524e2371a7873405371490e36d290f Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Thu, 29 Dec 2022 15:48:15 +0200 Subject: [PATCH 21/24] compaction_manager: keep owned_ranges_ptr in compaction_state When perform_cleanup adds sstables to sstables_requiring_cleanup, also save the owned_ranges_ptr in the compaction_state so it could be used by other compaction types like regular, reshape, or major compaction. When the exhausted sstables are released, check if sstables_requiring_cleanup is empty, and if it is, clear also the owned_ranges_ptr. Signed-off-by: Benny Halevy --- compaction/compaction_manager.cc | 6 ++++++ compaction/compaction_state.hh | 1 + 2 files changed, 7 insertions(+) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index ee85bc932b..ea822f5cb9 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -82,6 +82,9 @@ public: _compacting.erase(sst); _cs.sstables_requiring_cleanup.erase(sst); } + if (_cs.sstables_requiring_cleanup.empty()) { + _cs.owned_ranges_ptr = nullptr; + } } }; @@ -1607,6 +1610,9 @@ future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_range // for later processing if they can't be cleaned up right now. // They are erased from sstables_requiring_cleanup by compacting.release_compacting auto& cs = get_compaction_state(&t); + if (!cs.sstables_requiring_cleanup.empty()) { + cs.owned_ranges_ptr = std::move(sorted_owned_ranges); + } return get_candidates(t, cs.sstables_requiring_cleanup); }); }; diff --git a/compaction/compaction_state.hh b/compaction/compaction_state.hh index d235032c01..729992e9b1 100644 --- a/compaction/compaction_state.hh +++ b/compaction/compaction_state.hh @@ -35,6 +35,7 @@ struct compaction_state { compaction_backlog_tracker backlog_tracker; std::unordered_set sstables_requiring_cleanup; + owned_ranges_ptr owned_ranges_ptr; explicit compaction_state(table_state& t); compaction_state(compaction_state&&) = delete; From 9105f9800c809e39cbcd5fc99f7c488f7af30cad Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 26 Feb 2023 11:50:30 +0200 Subject: [PATCH 22/24] sstables: add a printer for shared_sstable Refactor the printing logic in compaction::formatted_sstables_list out to sstables::to_string(const shared_sstable&, bool include_origin) and operator<<(const shared_sstable) on top of it. So that we can easily print std::vector from compaction_manager in the next patch. Signed-off-by: Benny Halevy --- compaction/compaction.cc | 6 +----- sstables/shared_sstable.hh | 21 ++++++++++++++++++++- sstables/sstables.cc | 6 ++++++ test/boost/sstable_compaction_test.cc | 25 +++++++++++++++++++++++++ 4 files changed, 52 insertions(+), 6 deletions(-) diff --git a/compaction/compaction.cc b/compaction/compaction.cc index 83feb90dfd..e32e1f5b9a 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -424,11 +424,7 @@ public: } } formatted_sstables_list& operator+=(const shared_sstable& sst) { - if (_include_origin) { - _ssts.emplace_back(format("{}:level={:d}:origin={}", sst->get_filename(), sst->get_sstable_level(), sst->get_origin())); - } else { - _ssts.emplace_back(format("{}:level={:d}", sst->get_filename(), sst->get_sstable_level())); - } + _ssts.emplace_back(to_string(sst, _include_origin)); return *this; } friend std::ostream& operator<<(std::ostream& os, const formatted_sstables_list& lst); diff --git a/sstables/shared_sstable.hh b/sstables/shared_sstable.hh index 4bd1d355dd..a8c47881ac 100644 --- a/sstables/shared_sstable.hh +++ b/sstables/shared_sstable.hh @@ -12,6 +12,9 @@ #include #include #include + +#include + #include namespace sstables { @@ -35,6 +38,22 @@ namespace sstables { using shared_sstable = seastar::lw_shared_ptr; using sstable_list = std::unordered_set; +std::string to_string(const shared_sstable& sst, bool include_origin = true); + +} // namespace sstables + +template <> +struct fmt::formatter : fmt::formatter { + template + auto format(const sstables::shared_sstable& sst, FormatContext& ctx) const { + return fmt::format_to(ctx.out(), "{}", sstables::to_string(sst)); + } +}; + +namespace std { + +inline std::ostream& operator<<(std::ostream& os, const sstables::shared_sstable& sst) { + return os << fmt::format("{}", sst); } - +} // namespace std diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 4ced345196..79badc2c44 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -3412,6 +3412,12 @@ future<> remove_table_directory_if_has_no_snapshots(fs::path table_dir) { } } +std::string to_string(const shared_sstable& sst, bool include_origin) { + return include_origin ? + fmt::format("{}:level={:d}:origin={}", sst->get_filename(), sst->get_sstable_level(), sst->get_origin()) : + fmt::format("{}:level={:d}", sst->get_filename(), sst->get_sstable_level()); +} + } // namespace sstables namespace seastar { diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 4dc622ee32..d57268650a 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -4886,3 +4886,28 @@ SEASTAR_TEST_CASE(compaction_manager_stop_and_drain_race_test) { testlog.info("stopping compaction manager"); co_await cm.stop(); } + +SEASTAR_TEST_CASE(test_print_shared_sstables_vector) { + return test_env::do_with_async([] (test_env& env) { + simple_schema ss; + auto s = ss.schema(); + auto pks = ss.make_pkeys(2); + auto sst_gen = env.make_sst_factory(s); + + std::vector ssts(2); + + auto mut0 = mutation(s, pks[0]); + mut0.partition().apply_insert(*s, ss.make_ckey(0), ss.new_timestamp()); + ssts[0] = make_sstable_containing(sst_gen, {std::move(mut0)}); + + auto mut1 = mutation(s, pks[1]); + mut1.partition().apply_insert(*s, ss.make_ckey(1), ss.new_timestamp()); + ssts[1] = make_sstable_containing(sst_gen, {std::move(mut1)}); + + std::string msg = format("{}", ssts); + for (const auto& sst : ssts) { + auto gen_str = format("{}", sst->generation()); + BOOST_REQUIRE(msg.find(gen_str) != std::string::npos); + } + }); +} From 4db961ecac4a19e547dbe81b70890b2dd1080e54 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Fri, 30 Dec 2022 18:10:54 +0200 Subject: [PATCH 23/24] compaction_manager: compact_sstables: retrieve owned ranges if required If any of the sstables to-be-compacted requires cleanup, retrive the owned_ranges_ptr from the table_state. With that, staging sstables will eventually be cleaned up via regular compaction. Refs #9559 Signed-off-by: Benny Halevy --- compaction/compaction_manager.cc | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index ea822f5cb9..8c49854cbf 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -361,6 +361,24 @@ future compaction_task_executor::compact_sstables(s } }; + // retrieve owned_ranges if_required + if (!descriptor.owned_ranges) { + std::vector sstables_requiring_cleanup; + const auto& cs = _cm.get_compaction_state(_compacting_table); + for (const auto& sst : descriptor.sstables) { + if (cs.sstables_requiring_cleanup.contains(sst)) { + sstables_requiring_cleanup.emplace_back(sst); + } + } + if (!sstables_requiring_cleanup.empty()) { + cmlog.info("The following SSTables require cleaned up in this compaction: {}", sstables_requiring_cleanup); + if (!cs.owned_ranges_ptr) { + on_internal_error_noexcept(cmlog, "SSTables require cleanup but compaction state has null owned ranges"); + } + descriptor.owned_ranges = cs.owned_ranges_ptr; + } + } + co_return co_await sstables::compact_sstables(std::move(descriptor), cdata, t); } future<> compaction_task_executor::update_history(table_state& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata) { From 96660b2ef7262cbed59a01c26c4684bc04253f85 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 26 Feb 2023 13:36:05 +0200 Subject: [PATCH 24/24] table: discard_sstables: update_sstable_cleanup_state when deleting sstables We need to remove the deleted sstables from update_sstable_cleanup_state otherwise their data and index files will remain opened and their storage space won't be reclaimed. Signed-off-by: Benny Halevy --- replica/table.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/replica/table.cc b/replica/table.cc index 8678f75c08..8fef35dafb 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1910,11 +1910,12 @@ future table::discard_sstables(db_clock::time_point truncat tlogger.debug("cleaning out row cache"); })); rebuild_statistics(); - co_await coroutine::parallel_for_each(p->remove, [p] (pruner::removed_sstable& r) { + co_await coroutine::parallel_for_each(p->remove, [this, p] (pruner::removed_sstable& r) -> future<> { if (r.enable_backlog_tracker) { remove_sstable_from_backlog_tracker(r.cg.get_backlog_tracker(), r.sst); } - return sstables::sstable_directory::delete_atomically({r.sst}); + co_await sstables::sstable_directory::delete_atomically({r.sst}); + update_sstable_cleanup_state(r.sst, {}); }); co_return p->rp; }