compaction_manager: major_compaction_task: run in maintenance scheduling groupt

We should separate the scheduling groups used for major compaction
from the the regular compaction scheduling group so that
the latter can be affected by the backlog tracker in case
backlog accumulates during a long running major compaction.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2022-07-06 11:49:41 +03:00
parent a9dc7b1841
commit e3f561db31
5 changed files with 14 additions and 6 deletions

View File

@@ -337,7 +337,7 @@ protected:
// it cannot be the other way around, or minor compaction for this table would be
// prevented while an ongoing major compaction doesn't release the semaphore.
virtual future<> do_run() override {
co_await coroutine::switch_to(_cm._compaction_sg.cpu);
co_await coroutine::switch_to(_cm._maintenance_sg.cpu);
switch_state(state::pending);
auto units = co_await acquire_semaphore(_cm._maintenance_ops_sem);

View File

@@ -36,8 +36,9 @@ logging::logger leveled_manifest::logger("LeveledManifest");
namespace sstables {
compaction_descriptor compaction_strategy_impl::get_major_compaction_job(table_state& table_s, std::vector<sstables::shared_sstable> candidates) {
return compaction_descriptor(std::move(candidates), service::get_local_compaction_priority());
compaction_descriptor compaction_strategy_impl::make_major_compaction_job(std::vector<sstables::shared_sstable> candidates, int level, uint64_t max_sstable_bytes) {
// run major compaction in maintenance priority
return compaction_descriptor(std::move(candidates), service::get_local_streaming_priority(), level, max_sstable_bytes);
}
std::vector<compaction_descriptor> compaction_strategy_impl::get_cleanup_compaction_jobs(table_state& table_s, std::vector<shared_sstable> candidates) const {

View File

@@ -12,6 +12,7 @@
#include "compaction_backlog_manager.hh"
#include "compaction_strategy.hh"
#include "db_clock.hh"
#include "compaction_descriptor.hh"
namespace compaction {
class table_state;
@@ -23,7 +24,6 @@ namespace sstables {
compaction_backlog_tracker& get_unimplemented_backlog_tracker();
class sstable_set_impl;
class compaction_descriptor;
class resharding_descriptor;
class compaction_strategy_impl {
@@ -43,10 +43,15 @@ public:
protected:
compaction_strategy_impl() = default;
explicit compaction_strategy_impl(const std::map<sstring, sstring>& options);
static compaction_descriptor make_major_compaction_job(std::vector<sstables::shared_sstable> candidates,
int level = compaction_descriptor::default_level,
uint64_t max_sstable_bytes = compaction_descriptor::default_max_sstable_bytes);
public:
virtual ~compaction_strategy_impl() {}
virtual compaction_descriptor get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector<sstables::shared_sstable> candidates) = 0;
virtual compaction_descriptor get_major_compaction_job(table_state& table_s, std::vector<sstables::shared_sstable> candidates);
virtual compaction_descriptor get_major_compaction_job(table_state& table_s, std::vector<sstables::shared_sstable> candidates) {
return make_major_compaction_job(std::move(candidates));
}
virtual std::vector<compaction_descriptor> get_cleanup_compaction_jobs(table_state& table_s, std::vector<shared_sstable> candidates) const;
virtual void notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) { }
virtual compaction_strategy_type type() const = 0;

View File

@@ -63,7 +63,7 @@ compaction_descriptor leveled_compaction_strategy::get_major_compaction_job(tabl
auto max_sstable_size_in_bytes = _max_sstable_size_in_mb*1024*1024;
auto ideal_level = ideal_level_for_input(candidates, max_sstable_size_in_bytes);
return compaction_descriptor(std::move(candidates), service::get_local_compaction_priority(),
return make_major_compaction_job(std::move(candidates),
ideal_level, max_sstable_size_in_bytes);
}

View File

@@ -3163,6 +3163,7 @@ SEASTAR_TEST_CASE(compaction_strategy_aware_major_compaction_test) {
auto descriptor = cs.get_major_compaction_job(*table_s, candidates);
BOOST_REQUIRE(descriptor.sstables.size() == candidates.size());
BOOST_REQUIRE(uint32_t(descriptor.level) == leveled_compaction_strategy::ideal_level_for_input(candidates, 160*1024*1024));
BOOST_REQUIRE(descriptor.io_priority.id() == service::get_local_streaming_priority().id());
}
{
@@ -3170,6 +3171,7 @@ SEASTAR_TEST_CASE(compaction_strategy_aware_major_compaction_test) {
auto descriptor = cs.get_major_compaction_job(*table_s, candidates);
BOOST_REQUIRE(descriptor.sstables.size() == candidates.size());
BOOST_REQUIRE(descriptor.level == 0);
BOOST_REQUIRE(descriptor.io_priority.id() == service::get_local_streaming_priority().id());
}
});
}