diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 8c5dd188f0..fb7fca0ccf 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -337,7 +337,7 @@ protected: // it cannot be the other way around, or minor compaction for this table would be // prevented while an ongoing major compaction doesn't release the semaphore. virtual future<> do_run() override { - co_await coroutine::switch_to(_cm._compaction_sg.cpu); + co_await coroutine::switch_to(_cm._maintenance_sg.cpu); switch_state(state::pending); auto units = co_await acquire_semaphore(_cm._maintenance_ops_sem); diff --git a/compaction/compaction_strategy.cc b/compaction/compaction_strategy.cc index ef7af9d118..65e9e3d12a 100644 --- a/compaction/compaction_strategy.cc +++ b/compaction/compaction_strategy.cc @@ -36,8 +36,9 @@ logging::logger leveled_manifest::logger("LeveledManifest"); namespace sstables { -compaction_descriptor compaction_strategy_impl::get_major_compaction_job(table_state& table_s, std::vector candidates) { - return compaction_descriptor(std::move(candidates), service::get_local_compaction_priority()); +compaction_descriptor compaction_strategy_impl::make_major_compaction_job(std::vector candidates, int level, uint64_t max_sstable_bytes) { + // run major compaction in maintenance priority + return compaction_descriptor(std::move(candidates), service::get_local_streaming_priority(), level, max_sstable_bytes); } std::vector compaction_strategy_impl::get_cleanup_compaction_jobs(table_state& table_s, std::vector candidates) const { diff --git a/compaction/compaction_strategy_impl.hh b/compaction/compaction_strategy_impl.hh index e9573622cd..f97f27ae5e 100644 --- a/compaction/compaction_strategy_impl.hh +++ b/compaction/compaction_strategy_impl.hh @@ -12,6 +12,7 @@ #include "compaction_backlog_manager.hh" #include "compaction_strategy.hh" #include "db_clock.hh" +#include "compaction_descriptor.hh" namespace compaction { class table_state; @@ -23,7 +24,6 @@ namespace sstables { compaction_backlog_tracker& get_unimplemented_backlog_tracker(); class sstable_set_impl; -class compaction_descriptor; class resharding_descriptor; class compaction_strategy_impl { @@ -43,10 +43,15 @@ public: protected: compaction_strategy_impl() = default; explicit compaction_strategy_impl(const std::map& options); + static compaction_descriptor make_major_compaction_job(std::vector candidates, + int level = compaction_descriptor::default_level, + uint64_t max_sstable_bytes = compaction_descriptor::default_max_sstable_bytes); public: virtual ~compaction_strategy_impl() {} virtual compaction_descriptor get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector candidates) = 0; - virtual compaction_descriptor get_major_compaction_job(table_state& table_s, std::vector candidates); + virtual compaction_descriptor get_major_compaction_job(table_state& table_s, std::vector candidates) { + return make_major_compaction_job(std::move(candidates)); + } virtual std::vector get_cleanup_compaction_jobs(table_state& table_s, std::vector candidates) const; virtual void notify_completion(const std::vector& removed, const std::vector& added) { } virtual compaction_strategy_type type() const = 0; diff --git a/compaction/leveled_compaction_strategy.cc b/compaction/leveled_compaction_strategy.cc index 2e0aa9a995..03f19e3c72 100644 --- a/compaction/leveled_compaction_strategy.cc +++ b/compaction/leveled_compaction_strategy.cc @@ -63,7 +63,7 @@ compaction_descriptor leveled_compaction_strategy::get_major_compaction_job(tabl auto max_sstable_size_in_bytes = _max_sstable_size_in_mb*1024*1024; auto ideal_level = ideal_level_for_input(candidates, max_sstable_size_in_bytes); - return compaction_descriptor(std::move(candidates), service::get_local_compaction_priority(), + return make_major_compaction_job(std::move(candidates), ideal_level, max_sstable_size_in_bytes); } diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 727a277ece..b8d7387358 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -3163,6 +3163,7 @@ SEASTAR_TEST_CASE(compaction_strategy_aware_major_compaction_test) { auto descriptor = cs.get_major_compaction_job(*table_s, candidates); BOOST_REQUIRE(descriptor.sstables.size() == candidates.size()); BOOST_REQUIRE(uint32_t(descriptor.level) == leveled_compaction_strategy::ideal_level_for_input(candidates, 160*1024*1024)); + BOOST_REQUIRE(descriptor.io_priority.id() == service::get_local_streaming_priority().id()); } { @@ -3170,6 +3171,7 @@ SEASTAR_TEST_CASE(compaction_strategy_aware_major_compaction_test) { auto descriptor = cs.get_major_compaction_job(*table_s, candidates); BOOST_REQUIRE(descriptor.sstables.size() == candidates.size()); BOOST_REQUIRE(descriptor.level == 0); + BOOST_REQUIRE(descriptor.io_priority.id() == service::get_local_streaming_priority().id()); } }); }