diff --git a/backlog_controller.hh b/backlog_controller.hh index ec7eaafa19..3cf3350229 100644 --- a/backlog_controller.hh +++ b/backlog_controller.hh @@ -45,6 +45,12 @@ public: _update_timer.cancel(); return std::move(_inflight_update); } + + future<> update_static_shares(float static_shares) { + _static_shares = static_shares; + return make_ready_future<>(); + } + protected: struct control_point { float input; diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 3970d503b2..b5a254a85f 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -438,6 +438,11 @@ future<> compaction_manager::run_custom_job(compaction::table_state& t, sstables return perform_task(make_shared(*this, &t, type, desc, std::move(job))); } +future<> compaction_manager::update_static_shares(float static_shares) { + cmlog.info("Updating static shares to {}", static_shares); + return _compaction_controller.update_static_shares(static_shares); +} + compaction_manager::compaction_reenabler::compaction_reenabler(compaction_manager& cm, compaction::table_state& t) : _cm(cm) , _table(&t) @@ -638,6 +643,9 @@ compaction_manager::compaction_manager(config cfg, abort_source& as) do_stop(); })) , _throughput_mbs(std::move(cfg.throughput_mb_per_sec)) + , _static_shares(std::move(cfg.static_shares)) + , _update_compaction_static_shares_action([this] { return update_static_shares(_static_shares); }) + , _compaction_static_shares_observer(_static_shares.observe(_update_compaction_static_shares_action.make_observer())) , _strategy_control(std::make_unique(*this)) { register_metrics(); @@ -656,6 +664,9 @@ compaction_manager::compaction_manager() , _compaction_controller(make_compaction_controller(_compaction_sg, 1, [] () -> float { return 1.0; })) , _backlog_manager(_compaction_controller) , _available_memory(1) + , _static_shares(utils::updateable_value(0)) + , _update_compaction_static_shares_action([] { return make_ready_future<>(); }) + , _compaction_static_shares_observer(_static_shares.observe(_update_compaction_static_shares_action.make_observer())) , _strategy_control(std::make_unique(*this)) { // No metric registration because this constructor is supposed to be used only by the testing @@ -817,6 +828,7 @@ future<> compaction_manager::really_do_stop() { _compaction_submission_timer.cancel(); co_await _compaction_controller.shutdown(); co_await _throughput_updater.join(); + co_await _update_compaction_static_shares_action.join(); cmlog.info("Stopped"); } diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index e4b3b2ef8b..067399b3a7 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -51,7 +51,7 @@ public: scheduling_group compaction_sched_group; scheduling_group maintenance_sched_group; size_t available_memory; - uint64_t static_shares = 0; + utils::updateable_value static_shares = utils::updateable_value(0); utils::updateable_value throughput_mb_per_sec = utils::updateable_value(0); }; private: @@ -284,6 +284,9 @@ private: utils::updateable_value _throughput_mbs; serialized_action _throughput_updater = serialized_action([this] { return update_throughput(_throughput_mbs()); }); std::optional> _throughput_option_observer; + utils::updateable_value _static_shares; + serialized_action _update_compaction_static_shares_action; + utils::observer _compaction_static_shares_observer; class strategy_control; std::unique_ptr _strategy_control; @@ -330,6 +333,7 @@ private: void postpone_compaction_for_table(compaction::table_state* t); future<> perform_sstable_scrub_validate_mode(compaction::table_state& t); + future<> update_static_shares(float shares); using get_candidates_func = std::function>()>; diff --git a/db/config.cc b/db/config.cc index c10e1c7bff..1236f18abe 100644 --- a/db/config.cc +++ b/db/config.cc @@ -279,7 +279,7 @@ db::config::config(std::shared_ptr exts) "true: auto-adjust memtable shares for flush processes") , memtable_flush_static_shares(this, "memtable_flush_static_shares", value_status::Used, 0, "If set to higher than 0, ignore the controller's output and set the memtable shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity") - , compaction_static_shares(this, "compaction_static_shares", value_status::Used, 0, + , compaction_static_shares(this, "compaction_static_shares", liveness::LiveUpdate, value_status::Used, 0, "If set to higher than 0, ignore the controller's output and set the compaction shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity") , compaction_enforce_min_threshold(this, "compaction_enforce_min_threshold", liveness::LiveUpdate, value_status::Used, false, "If set to true, enforce the min_threshold option for compactions strictly. If false (default), Scylla may decide to compact even if below min_threshold") diff --git a/replica/database.cc b/replica/database.cc index 8b808d8604..6770811dbb 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -94,7 +94,7 @@ inline compaction_manager::config make_compaction_manager_config(const db::confi .compaction_sched_group = compaction_manager::scheduling_group{dbcfg.compaction_scheduling_group, service::get_local_compaction_priority()}, .maintenance_sched_group = compaction_manager::scheduling_group{dbcfg.streaming_scheduling_group, service::get_local_streaming_priority()}, .available_memory = dbcfg.available_memory, - .static_shares = cfg.compaction_static_shares(), + .static_shares = cfg.compaction_static_shares, .throughput_mb_per_sec = cfg.compaction_throughput_mb_per_sec, }; }