diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 1d0c87d0c6..757451ca42 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1193,7 +1193,7 @@ public: : task(mgr, t, options.type(), sstring(sstables::to_string(options.type()))) , _cleanup_options(std::move(options)) , _compacting(std::move(compacting)) - , _pending_cleanup_jobs(t->get_compaction_strategy().get_cleanup_compaction_jobs(t->as_table_state(), candidates)) + , _pending_cleanup_jobs(t->get_compaction_strategy().get_cleanup_compaction_jobs(t->as_table_state(), std::move(candidates))) { // Cleanup is made more resilient under disk space pressure, by cleaning up smaller jobs first, so larger jobs // will have more space available released by previous jobs. diff --git a/compaction/compaction_strategy.cc b/compaction/compaction_strategy.cc index 75bc81048a..9601dd383b 100644 --- a/compaction/compaction_strategy.cc +++ b/compaction/compaction_strategy.cc @@ -40,7 +40,7 @@ compaction_descriptor compaction_strategy_impl::get_major_compaction_job(table_s return compaction_descriptor(std::move(candidates), service::get_local_compaction_priority()); } -std::vector compaction_strategy_impl::get_cleanup_compaction_jobs(table_state& table_s, const std::vector& candidates) const { +std::vector compaction_strategy_impl::get_cleanup_compaction_jobs(table_state& table_s, std::vector candidates) const { // The default implementation is suboptimal and causes the writeamp problem described issue in #10097. // The compaction strategy relying on it should strive to implement its own method, to make cleanup bucket aware. return boost::copy_range>(candidates | boost::adaptors::transformed([] (const shared_sstable& sst) { @@ -673,8 +673,8 @@ compaction_descriptor compaction_strategy::get_major_compaction_job(table_state& return _compaction_strategy_impl->get_major_compaction_job(table_s, std::move(candidates)); } -std::vector compaction_strategy::get_cleanup_compaction_jobs(table_state& table_s, const std::vector& candidates) const { - return _compaction_strategy_impl->get_cleanup_compaction_jobs(table_s, candidates); +std::vector compaction_strategy::get_cleanup_compaction_jobs(table_state& table_s, std::vector candidates) const { + return _compaction_strategy_impl->get_cleanup_compaction_jobs(table_s, std::move(candidates)); } void compaction_strategy::notify_completion(const std::vector& removed, const std::vector& added) { diff --git a/compaction/compaction_strategy.hh b/compaction/compaction_strategy.hh index 2d771e8e4d..c72b08b95a 100644 --- a/compaction/compaction_strategy.hh +++ b/compaction/compaction_strategy.hh @@ -49,7 +49,7 @@ public: compaction_descriptor get_major_compaction_job(table_state& table_s, std::vector candidates); - std::vector get_cleanup_compaction_jobs(table_state& table_s, const std::vector& candidates) const; + std::vector get_cleanup_compaction_jobs(table_state& table_s, std::vector candidates) const; // Some strategies may look at the compacted and resulting sstables to // get some useful information for subsequent compactions. diff --git a/compaction/compaction_strategy_impl.hh b/compaction/compaction_strategy_impl.hh index 5311a35065..e9573622cd 100644 --- a/compaction/compaction_strategy_impl.hh +++ b/compaction/compaction_strategy_impl.hh @@ -47,7 +47,7 @@ public: virtual ~compaction_strategy_impl() {} virtual compaction_descriptor get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector candidates) = 0; virtual compaction_descriptor get_major_compaction_job(table_state& table_s, std::vector candidates); - virtual std::vector get_cleanup_compaction_jobs(table_state& table_s, const std::vector& candidates) const; + virtual std::vector get_cleanup_compaction_jobs(table_state& table_s, std::vector candidates) const; virtual void notify_completion(const std::vector& removed, const std::vector& added) { } virtual compaction_strategy_type type() const = 0; virtual bool parallel_compaction() const { diff --git a/compaction/size_tiered_compaction_strategy.cc b/compaction/size_tiered_compaction_strategy.cc index ff6691a711..8b8cefe384 100644 --- a/compaction/size_tiered_compaction_strategy.cc +++ b/compaction/size_tiered_compaction_strategy.cc @@ -268,7 +268,7 @@ size_tiered_compaction_strategy::get_reshaping_job(std::vector i } std::vector -size_tiered_compaction_strategy::get_cleanup_compaction_jobs(table_state& table_s, const std::vector& candidates) const { +size_tiered_compaction_strategy::get_cleanup_compaction_jobs(table_state& table_s, std::vector candidates) const { std::vector ret; const auto& schema = table_s.schema(); unsigned max_threshold = schema->max_compaction_threshold(); diff --git a/compaction/size_tiered_compaction_strategy.hh b/compaction/size_tiered_compaction_strategy.hh index 54d16c0628..3bcfcec362 100644 --- a/compaction/size_tiered_compaction_strategy.hh +++ b/compaction/size_tiered_compaction_strategy.hh @@ -125,7 +125,7 @@ public: virtual compaction_descriptor get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector candidates) override; - virtual std::vector get_cleanup_compaction_jobs(table_state& table_s, const std::vector& candidates) const override; + virtual std::vector get_cleanup_compaction_jobs(table_state& table_s, std::vector candidates) const override; static int64_t estimated_pending_compactions(const std::vector& sstables, int min_threshold, int max_threshold, size_tiered_compaction_strategy_options options); diff --git a/compaction/time_window_compaction_strategy.cc b/compaction/time_window_compaction_strategy.cc index d912122429..ba44c92cd6 100644 --- a/compaction/time_window_compaction_strategy.cc +++ b/compaction/time_window_compaction_strategy.cc @@ -303,7 +303,7 @@ time_window_compaction_strategy::get_window_lower_bound(std::chrono::seconds sst } std::pair>, timestamp_type> -time_window_compaction_strategy::get_buckets(std::vector files, time_window_compaction_strategy_options& options) { +time_window_compaction_strategy::get_buckets(std::vector files, const time_window_compaction_strategy_options& options) { std::map> buckets; timestamp_type max_timestamp = 0; @@ -408,4 +408,14 @@ void time_window_compaction_strategy::update_estimated_compaction_by_tasks(std:: _estimated_remaining_tasks = n; } +std::vector +time_window_compaction_strategy::get_cleanup_compaction_jobs(table_state& table_s, std::vector candidates) const { + std::vector ret; + for (auto&& [_, sstables] : get_buckets(std::move(candidates), _options).first) { + auto per_window_jobs = size_tiered_compaction_strategy(_stcs_options).get_cleanup_compaction_jobs(table_s, std::move(sstables)); + std::move(per_window_jobs.begin(), per_window_jobs.end(), std::back_inserter(ret)); + } + return ret; +} + } diff --git a/compaction/time_window_compaction_strategy.hh b/compaction/time_window_compaction_strategy.hh index 7bdbc95f73..a9e79ce85e 100644 --- a/compaction/time_window_compaction_strategy.hh +++ b/compaction/time_window_compaction_strategy.hh @@ -88,6 +88,8 @@ public: public: time_window_compaction_strategy(const std::map& options); virtual compaction_descriptor get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector candidates) override; + + virtual std::vector get_cleanup_compaction_jobs(table_state& table_s, std::vector candidates) const override; private: static timestamp_type to_timestamp_type(time_window_compaction_strategy_options::timestamp_resolutions resolution, int64_t timestamp_from_sstable) { @@ -123,7 +125,7 @@ public: // @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader), // and the right is the highest timestamp seen static std::pair>, timestamp_type> - get_buckets(std::vector files, time_window_compaction_strategy_options& options); + get_buckets(std::vector files, const time_window_compaction_strategy_options& options); std::vector newest_bucket(table_state& table_s, strategy_control& control, std::map> buckets, diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 080a0a1f63..fb8f7d4212 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -4926,12 +4926,15 @@ SEASTAR_TEST_CASE(test_compaction_strategy_cleanup_method) { return test_env::do_with_async([] (test_env& env) { constexpr size_t all_files = 64; - auto get_cleanup_jobs = [&env, &all_files] (sstables::compaction_strategy_type compaction_strategy_type) { + auto get_cleanup_jobs = [&env, &all_files] (sstables::compaction_strategy_type compaction_strategy_type, + std::map strategy_options = {}, + const api::timestamp_clock::duration step_base = 0ms) { auto builder = schema_builder("tests", "test_compaction_strategy_cleanup_method") .with_column("id", utf8_type, column_kind::partition_key) .with_column("cl", int32_type, column_kind::clustering_key) .with_column("value", int32_type); builder.set_compaction_strategy(compaction_strategy_type); + builder.set_compaction_strategy_options(std::move(strategy_options)); auto s = builder.build(); auto tmp = tmpdir(); @@ -4944,18 +4947,24 @@ SEASTAR_TEST_CASE(test_compaction_strategy_cleanup_method) { sstables::get_highest_sstable_version(), big); }; - auto make_mutation = [&](unsigned pkey_idx) { + using namespace std::chrono; + auto now = gc_clock::now().time_since_epoch() + duration_cast(seconds(tests::random::get_int(0, 3600*24))); + auto next_timestamp = [&now] (microseconds step) mutable -> api::timestamp_type { + return (now + step).count(); + }; + auto make_mutation = [&] (unsigned pkey_idx, api::timestamp_type ts) { auto pkey = partition_key::from_exploded(*s, {to_bytes(tokens[pkey_idx].first)}); mutation m(s, pkey); auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(1)}); - m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(1)), gc_clock::now().time_since_epoch().count()); + m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(1)), ts); return m; }; std::vector candidates; candidates.reserve(all_files); for (auto i = 0; i < all_files; i++) { - candidates.push_back(make_sstable_containing(sst_gen, {make_mutation(i)})); + auto current_step = duration_cast(step_base) * i; + candidates.push_back(make_sstable_containing(sst_gen, {make_mutation(i, next_timestamp(current_step))})); } auto strategy = cf->get_compaction_strategy(); @@ -4963,9 +4972,10 @@ SEASTAR_TEST_CASE(test_compaction_strategy_cleanup_method) { return std::make_pair(std::move(candidates), std::move(jobs)); }; - auto run_cleanup_strategy_test = [&] (sstables::compaction_strategy_type compaction_strategy_type, size_t per_job_files) { + auto run_cleanup_strategy_test = [&] (sstables::compaction_strategy_type compaction_strategy_type, size_t per_job_files, auto&&... args) { + testlog.info("Running cleanup test for strategy type {}", compaction_strategy::name(compaction_strategy_type)); size_t target_job_count = all_files / per_job_files; - auto [candidates, descriptors] = get_cleanup_jobs(compaction_strategy_type); + auto [candidates, descriptors] = get_cleanup_jobs(compaction_strategy_type, std::forward(args)...); BOOST_REQUIRE(descriptors.size() == target_job_count); auto generations = boost::copy_range>(candidates | boost::adaptors::transformed(std::mem_fn(&sstables::sstable::generation))); auto check_desc = [&] (const auto& desc) { @@ -4984,5 +4994,12 @@ SEASTAR_TEST_CASE(test_compaction_strategy_cleanup_method) { // Default implementation: check that it will return one job for each file run_cleanup_strategy_test(sstables::compaction_strategy_type::null, 1); + + // TWCS: Check that it will return one job for each time window + std::map twcs_opts = { + {time_window_compaction_strategy_options::COMPACTION_WINDOW_UNIT_KEY, "HOURS"}, + {time_window_compaction_strategy_options::COMPACTION_WINDOW_SIZE_KEY, "1"}, + }; + run_cleanup_strategy_test(sstables::compaction_strategy_type::time_window, 1, std::move(twcs_opts), 1h); }); }