compaction_manager: major_compaction_task: run in maintenance scheduling groupt
We should separate the scheduling groups used for major compaction from the the regular compaction scheduling group so that the latter can be affected by the backlog tracker in case backlog accumulates during a long running major compaction. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -337,7 +337,7 @@ protected:
|
||||
// it cannot be the other way around, or minor compaction for this table would be
|
||||
// prevented while an ongoing major compaction doesn't release the semaphore.
|
||||
virtual future<> do_run() override {
|
||||
co_await coroutine::switch_to(_cm._compaction_sg.cpu);
|
||||
co_await coroutine::switch_to(_cm._maintenance_sg.cpu);
|
||||
|
||||
switch_state(state::pending);
|
||||
auto units = co_await acquire_semaphore(_cm._maintenance_ops_sem);
|
||||
|
||||
@@ -36,8 +36,9 @@ logging::logger leveled_manifest::logger("LeveledManifest");
|
||||
|
||||
namespace sstables {
|
||||
|
||||
compaction_descriptor compaction_strategy_impl::get_major_compaction_job(table_state& table_s, std::vector<sstables::shared_sstable> candidates) {
|
||||
return compaction_descriptor(std::move(candidates), service::get_local_compaction_priority());
|
||||
compaction_descriptor compaction_strategy_impl::make_major_compaction_job(std::vector<sstables::shared_sstable> candidates, int level, uint64_t max_sstable_bytes) {
|
||||
// run major compaction in maintenance priority
|
||||
return compaction_descriptor(std::move(candidates), service::get_local_streaming_priority(), level, max_sstable_bytes);
|
||||
}
|
||||
|
||||
std::vector<compaction_descriptor> compaction_strategy_impl::get_cleanup_compaction_jobs(table_state& table_s, std::vector<shared_sstable> candidates) const {
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
#include "compaction_backlog_manager.hh"
|
||||
#include "compaction_strategy.hh"
|
||||
#include "db_clock.hh"
|
||||
#include "compaction_descriptor.hh"
|
||||
|
||||
namespace compaction {
|
||||
class table_state;
|
||||
@@ -23,7 +24,6 @@ namespace sstables {
|
||||
compaction_backlog_tracker& get_unimplemented_backlog_tracker();
|
||||
|
||||
class sstable_set_impl;
|
||||
class compaction_descriptor;
|
||||
class resharding_descriptor;
|
||||
|
||||
class compaction_strategy_impl {
|
||||
@@ -43,10 +43,15 @@ public:
|
||||
protected:
|
||||
compaction_strategy_impl() = default;
|
||||
explicit compaction_strategy_impl(const std::map<sstring, sstring>& options);
|
||||
static compaction_descriptor make_major_compaction_job(std::vector<sstables::shared_sstable> candidates,
|
||||
int level = compaction_descriptor::default_level,
|
||||
uint64_t max_sstable_bytes = compaction_descriptor::default_max_sstable_bytes);
|
||||
public:
|
||||
virtual ~compaction_strategy_impl() {}
|
||||
virtual compaction_descriptor get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector<sstables::shared_sstable> candidates) = 0;
|
||||
virtual compaction_descriptor get_major_compaction_job(table_state& table_s, std::vector<sstables::shared_sstable> candidates);
|
||||
virtual compaction_descriptor get_major_compaction_job(table_state& table_s, std::vector<sstables::shared_sstable> candidates) {
|
||||
return make_major_compaction_job(std::move(candidates));
|
||||
}
|
||||
virtual std::vector<compaction_descriptor> get_cleanup_compaction_jobs(table_state& table_s, std::vector<shared_sstable> candidates) const;
|
||||
virtual void notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) { }
|
||||
virtual compaction_strategy_type type() const = 0;
|
||||
|
||||
@@ -63,7 +63,7 @@ compaction_descriptor leveled_compaction_strategy::get_major_compaction_job(tabl
|
||||
|
||||
auto max_sstable_size_in_bytes = _max_sstable_size_in_mb*1024*1024;
|
||||
auto ideal_level = ideal_level_for_input(candidates, max_sstable_size_in_bytes);
|
||||
return compaction_descriptor(std::move(candidates), service::get_local_compaction_priority(),
|
||||
return make_major_compaction_job(std::move(candidates),
|
||||
ideal_level, max_sstable_size_in_bytes);
|
||||
}
|
||||
|
||||
|
||||
@@ -3163,6 +3163,7 @@ SEASTAR_TEST_CASE(compaction_strategy_aware_major_compaction_test) {
|
||||
auto descriptor = cs.get_major_compaction_job(*table_s, candidates);
|
||||
BOOST_REQUIRE(descriptor.sstables.size() == candidates.size());
|
||||
BOOST_REQUIRE(uint32_t(descriptor.level) == leveled_compaction_strategy::ideal_level_for_input(candidates, 160*1024*1024));
|
||||
BOOST_REQUIRE(descriptor.io_priority.id() == service::get_local_streaming_priority().id());
|
||||
}
|
||||
|
||||
{
|
||||
@@ -3170,6 +3171,7 @@ SEASTAR_TEST_CASE(compaction_strategy_aware_major_compaction_test) {
|
||||
auto descriptor = cs.get_major_compaction_job(*table_s, candidates);
|
||||
BOOST_REQUIRE(descriptor.sstables.size() == candidates.size());
|
||||
BOOST_REQUIRE(descriptor.level == 0);
|
||||
BOOST_REQUIRE(descriptor.io_priority.id() == service::get_local_streaming_priority().id());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user