From 2a9bfa3e3fb50661f7643f4b3d3d442a3cb74d07 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 29 Mar 2022 09:29:00 -0300 Subject: [PATCH 1/4] compaction_strategy: get_cleanup_compaction_jobs: accept candidates by value Then caller can decide whether to copy or move candidate set into the function. cleanup_sstables_compaction_task can move candidates as it's no longer needed once it retrieves all descriptors. Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 2 +- compaction/compaction_strategy.cc | 6 +++--- compaction/compaction_strategy.hh | 2 +- compaction/compaction_strategy_impl.hh | 2 +- compaction/size_tiered_compaction_strategy.cc | 2 +- compaction/size_tiered_compaction_strategy.hh | 2 +- 6 files changed, 8 insertions(+), 8 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 336276d0ac..2b8010ee0a 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1193,7 +1193,7 @@ public: : task(mgr, t, options.type(), sstring(sstables::to_string(options.type()))) , _cleanup_options(std::move(options)) , _compacting(std::move(compacting)) - , _pending_cleanup_jobs(t->get_compaction_strategy().get_cleanup_compaction_jobs(t->as_table_state(), candidates)) + , _pending_cleanup_jobs(t->get_compaction_strategy().get_cleanup_compaction_jobs(t->as_table_state(), std::move(candidates))) { // Cleanup is made more resilient under disk space pressure, by cleaning up smaller jobs first, so larger jobs // will have more space available released by previous jobs. diff --git a/compaction/compaction_strategy.cc b/compaction/compaction_strategy.cc index 75bc81048a..9601dd383b 100644 --- a/compaction/compaction_strategy.cc +++ b/compaction/compaction_strategy.cc @@ -40,7 +40,7 @@ compaction_descriptor compaction_strategy_impl::get_major_compaction_job(table_s return compaction_descriptor(std::move(candidates), service::get_local_compaction_priority()); } -std::vector compaction_strategy_impl::get_cleanup_compaction_jobs(table_state& table_s, const std::vector& candidates) const { +std::vector compaction_strategy_impl::get_cleanup_compaction_jobs(table_state& table_s, std::vector candidates) const { // The default implementation is suboptimal and causes the writeamp problem described issue in #10097. // The compaction strategy relying on it should strive to implement its own method, to make cleanup bucket aware. return boost::copy_range>(candidates | boost::adaptors::transformed([] (const shared_sstable& sst) { @@ -673,8 +673,8 @@ compaction_descriptor compaction_strategy::get_major_compaction_job(table_state& return _compaction_strategy_impl->get_major_compaction_job(table_s, std::move(candidates)); } -std::vector compaction_strategy::get_cleanup_compaction_jobs(table_state& table_s, const std::vector& candidates) const { - return _compaction_strategy_impl->get_cleanup_compaction_jobs(table_s, candidates); +std::vector compaction_strategy::get_cleanup_compaction_jobs(table_state& table_s, std::vector candidates) const { + return _compaction_strategy_impl->get_cleanup_compaction_jobs(table_s, std::move(candidates)); } void compaction_strategy::notify_completion(const std::vector& removed, const std::vector& added) { diff --git a/compaction/compaction_strategy.hh b/compaction/compaction_strategy.hh index 2d771e8e4d..c72b08b95a 100644 --- a/compaction/compaction_strategy.hh +++ b/compaction/compaction_strategy.hh @@ -49,7 +49,7 @@ public: compaction_descriptor get_major_compaction_job(table_state& table_s, std::vector candidates); - std::vector get_cleanup_compaction_jobs(table_state& table_s, const std::vector& candidates) const; + std::vector get_cleanup_compaction_jobs(table_state& table_s, std::vector candidates) const; // Some strategies may look at the compacted and resulting sstables to // get some useful information for subsequent compactions. diff --git a/compaction/compaction_strategy_impl.hh b/compaction/compaction_strategy_impl.hh index 5311a35065..e9573622cd 100644 --- a/compaction/compaction_strategy_impl.hh +++ b/compaction/compaction_strategy_impl.hh @@ -47,7 +47,7 @@ public: virtual ~compaction_strategy_impl() {} virtual compaction_descriptor get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector candidates) = 0; virtual compaction_descriptor get_major_compaction_job(table_state& table_s, std::vector candidates); - virtual std::vector get_cleanup_compaction_jobs(table_state& table_s, const std::vector& candidates) const; + virtual std::vector get_cleanup_compaction_jobs(table_state& table_s, std::vector candidates) const; virtual void notify_completion(const std::vector& removed, const std::vector& added) { } virtual compaction_strategy_type type() const = 0; virtual bool parallel_compaction() const { diff --git a/compaction/size_tiered_compaction_strategy.cc b/compaction/size_tiered_compaction_strategy.cc index ff6691a711..8b8cefe384 100644 --- a/compaction/size_tiered_compaction_strategy.cc +++ b/compaction/size_tiered_compaction_strategy.cc @@ -268,7 +268,7 @@ size_tiered_compaction_strategy::get_reshaping_job(std::vector i } std::vector -size_tiered_compaction_strategy::get_cleanup_compaction_jobs(table_state& table_s, const std::vector& candidates) const { +size_tiered_compaction_strategy::get_cleanup_compaction_jobs(table_state& table_s, std::vector candidates) const { std::vector ret; const auto& schema = table_s.schema(); unsigned max_threshold = schema->max_compaction_threshold(); diff --git a/compaction/size_tiered_compaction_strategy.hh b/compaction/size_tiered_compaction_strategy.hh index 54d16c0628..3bcfcec362 100644 --- a/compaction/size_tiered_compaction_strategy.hh +++ b/compaction/size_tiered_compaction_strategy.hh @@ -125,7 +125,7 @@ public: virtual compaction_descriptor get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector candidates) override; - virtual std::vector get_cleanup_compaction_jobs(table_state& table_s, const std::vector& candidates) const override; + virtual std::vector get_cleanup_compaction_jobs(table_state& table_s, std::vector candidates) const override; static int64_t estimated_pending_compactions(const std::vector& sstables, int min_threshold, int max_threshold, size_tiered_compaction_strategy_options options); From 8f4c04c38a01dd9651dbe08c60b07a237ed7a2b9 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 29 Mar 2022 09:31:30 -0300 Subject: [PATCH 2/4] compaction: TWCS: change get_buckets() signature to work with const qualified functions Signed-off-by: Raphael S. Carvalho --- compaction/time_window_compaction_strategy.cc | 2 +- compaction/time_window_compaction_strategy.hh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/compaction/time_window_compaction_strategy.cc b/compaction/time_window_compaction_strategy.cc index d912122429..27c57cf901 100644 --- a/compaction/time_window_compaction_strategy.cc +++ b/compaction/time_window_compaction_strategy.cc @@ -303,7 +303,7 @@ time_window_compaction_strategy::get_window_lower_bound(std::chrono::seconds sst } std::pair>, timestamp_type> -time_window_compaction_strategy::get_buckets(std::vector files, time_window_compaction_strategy_options& options) { +time_window_compaction_strategy::get_buckets(std::vector files, const time_window_compaction_strategy_options& options) { std::map> buckets; timestamp_type max_timestamp = 0; diff --git a/compaction/time_window_compaction_strategy.hh b/compaction/time_window_compaction_strategy.hh index 7bdbc95f73..df81da9afc 100644 --- a/compaction/time_window_compaction_strategy.hh +++ b/compaction/time_window_compaction_strategy.hh @@ -123,7 +123,7 @@ public: // @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader), // and the right is the highest timestamp seen static std::pair>, timestamp_type> - get_buckets(std::vector files, time_window_compaction_strategy_options& options); + get_buckets(std::vector files, const time_window_compaction_strategy_options& options); std::vector newest_bucket(table_state& table_s, strategy_control& control, std::map> buckets, From 568bb40127bece1db6f68365617c0187e1f656cf Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 29 Mar 2022 09:35:09 -0300 Subject: [PATCH 3/4] compaction: TWCS: Implement cleanup method for bucket awareness This continues the work in a69d98c3d0cba01bdb17ad550b53bbb0a336e8f2, by implementing the cleanup method in TWCS to make it bucket aware. Till now, the default impl was used which cleanups on file at a time, starting from the smallest. The cleanup strategy for TWCS is simple. It's simply calling the size tiered cleanup method for each bucket, so there will be one job for each tier in each window. The next strategies to receive this improvement are LCS and ICS (the latter one being only available in enterprise). Refs #10097. Signed-off-by: Raphael S. Carvalho --- compaction/time_window_compaction_strategy.cc | 10 ++++++++++ compaction/time_window_compaction_strategy.hh | 2 ++ 2 files changed, 12 insertions(+) diff --git a/compaction/time_window_compaction_strategy.cc b/compaction/time_window_compaction_strategy.cc index 27c57cf901..ba44c92cd6 100644 --- a/compaction/time_window_compaction_strategy.cc +++ b/compaction/time_window_compaction_strategy.cc @@ -408,4 +408,14 @@ void time_window_compaction_strategy::update_estimated_compaction_by_tasks(std:: _estimated_remaining_tasks = n; } +std::vector +time_window_compaction_strategy::get_cleanup_compaction_jobs(table_state& table_s, std::vector candidates) const { + std::vector ret; + for (auto&& [_, sstables] : get_buckets(std::move(candidates), _options).first) { + auto per_window_jobs = size_tiered_compaction_strategy(_stcs_options).get_cleanup_compaction_jobs(table_s, std::move(sstables)); + std::move(per_window_jobs.begin(), per_window_jobs.end(), std::back_inserter(ret)); + } + return ret; +} + } diff --git a/compaction/time_window_compaction_strategy.hh b/compaction/time_window_compaction_strategy.hh index df81da9afc..a9e79ce85e 100644 --- a/compaction/time_window_compaction_strategy.hh +++ b/compaction/time_window_compaction_strategy.hh @@ -88,6 +88,8 @@ public: public: time_window_compaction_strategy(const std::map& options); virtual compaction_descriptor get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector candidates) override; + + virtual std::vector get_cleanup_compaction_jobs(table_state& table_s, std::vector candidates) const override; private: static timestamp_type to_timestamp_type(time_window_compaction_strategy_options::timestamp_resolutions resolution, int64_t timestamp_from_sstable) { From a1fd9c1ee8d3214f0f789c2e4c10eccab8f45b89 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 29 Mar 2022 09:37:11 -0300 Subject: [PATCH 4/4] tests: sstable_compaction_test: Add test for TWCS' bucket-aware cleanup Signed-off-by: Raphael S. Carvalho --- test/boost/sstable_compaction_test.cc | 29 +++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 080a0a1f63..fb8f7d4212 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -4926,12 +4926,15 @@ SEASTAR_TEST_CASE(test_compaction_strategy_cleanup_method) { return test_env::do_with_async([] (test_env& env) { constexpr size_t all_files = 64; - auto get_cleanup_jobs = [&env, &all_files] (sstables::compaction_strategy_type compaction_strategy_type) { + auto get_cleanup_jobs = [&env, &all_files] (sstables::compaction_strategy_type compaction_strategy_type, + std::map strategy_options = {}, + const api::timestamp_clock::duration step_base = 0ms) { auto builder = schema_builder("tests", "test_compaction_strategy_cleanup_method") .with_column("id", utf8_type, column_kind::partition_key) .with_column("cl", int32_type, column_kind::clustering_key) .with_column("value", int32_type); builder.set_compaction_strategy(compaction_strategy_type); + builder.set_compaction_strategy_options(std::move(strategy_options)); auto s = builder.build(); auto tmp = tmpdir(); @@ -4944,18 +4947,24 @@ SEASTAR_TEST_CASE(test_compaction_strategy_cleanup_method) { sstables::get_highest_sstable_version(), big); }; - auto make_mutation = [&](unsigned pkey_idx) { + using namespace std::chrono; + auto now = gc_clock::now().time_since_epoch() + duration_cast(seconds(tests::random::get_int(0, 3600*24))); + auto next_timestamp = [&now] (microseconds step) mutable -> api::timestamp_type { + return (now + step).count(); + }; + auto make_mutation = [&] (unsigned pkey_idx, api::timestamp_type ts) { auto pkey = partition_key::from_exploded(*s, {to_bytes(tokens[pkey_idx].first)}); mutation m(s, pkey); auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(1)}); - m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(1)), gc_clock::now().time_since_epoch().count()); + m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(1)), ts); return m; }; std::vector candidates; candidates.reserve(all_files); for (auto i = 0; i < all_files; i++) { - candidates.push_back(make_sstable_containing(sst_gen, {make_mutation(i)})); + auto current_step = duration_cast(step_base) * i; + candidates.push_back(make_sstable_containing(sst_gen, {make_mutation(i, next_timestamp(current_step))})); } auto strategy = cf->get_compaction_strategy(); @@ -4963,9 +4972,10 @@ SEASTAR_TEST_CASE(test_compaction_strategy_cleanup_method) { return std::make_pair(std::move(candidates), std::move(jobs)); }; - auto run_cleanup_strategy_test = [&] (sstables::compaction_strategy_type compaction_strategy_type, size_t per_job_files) { + auto run_cleanup_strategy_test = [&] (sstables::compaction_strategy_type compaction_strategy_type, size_t per_job_files, auto&&... args) { + testlog.info("Running cleanup test for strategy type {}", compaction_strategy::name(compaction_strategy_type)); size_t target_job_count = all_files / per_job_files; - auto [candidates, descriptors] = get_cleanup_jobs(compaction_strategy_type); + auto [candidates, descriptors] = get_cleanup_jobs(compaction_strategy_type, std::forward(args)...); BOOST_REQUIRE(descriptors.size() == target_job_count); auto generations = boost::copy_range>(candidates | boost::adaptors::transformed(std::mem_fn(&sstables::sstable::generation))); auto check_desc = [&] (const auto& desc) { @@ -4984,5 +4994,12 @@ SEASTAR_TEST_CASE(test_compaction_strategy_cleanup_method) { // Default implementation: check that it will return one job for each file run_cleanup_strategy_test(sstables::compaction_strategy_type::null, 1); + + // TWCS: Check that it will return one job for each time window + std::map twcs_opts = { + {time_window_compaction_strategy_options::COMPACTION_WINDOW_UNIT_KEY, "HOURS"}, + {time_window_compaction_strategy_options::COMPACTION_WINDOW_SIZE_KEY, "1"}, + }; + run_cleanup_strategy_test(sstables::compaction_strategy_type::time_window, 1, std::move(twcs_opts), 1h); }); }