mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-28 10:41:12 +00:00
compaction: Make compaction_static_shares liveupdateable
This patch makes compaction_static_shares liveupdateable to avoid having to restart the cluster after updating this config. Signed-off-by: Igor Ribeiro Barbosa Duarte <igor.duarte@scylladb.com>
This commit is contained in:
@@ -45,6 +45,12 @@ public:
|
||||
_update_timer.cancel();
|
||||
return std::move(_inflight_update);
|
||||
}
|
||||
|
||||
future<> update_static_shares(float static_shares) {
|
||||
_static_shares = static_shares;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
protected:
|
||||
struct control_point {
|
||||
float input;
|
||||
|
||||
@@ -438,6 +438,11 @@ future<> compaction_manager::run_custom_job(compaction::table_state& t, sstables
|
||||
return perform_task(make_shared<custom_compaction_task>(*this, &t, type, desc, std::move(job)));
|
||||
}
|
||||
|
||||
future<> compaction_manager::update_static_shares(float static_shares) {
|
||||
cmlog.info("Updating static shares to {}", static_shares);
|
||||
return _compaction_controller.update_static_shares(static_shares);
|
||||
}
|
||||
|
||||
compaction_manager::compaction_reenabler::compaction_reenabler(compaction_manager& cm, compaction::table_state& t)
|
||||
: _cm(cm)
|
||||
, _table(&t)
|
||||
@@ -638,6 +643,9 @@ compaction_manager::compaction_manager(config cfg, abort_source& as)
|
||||
do_stop();
|
||||
}))
|
||||
, _throughput_mbs(std::move(cfg.throughput_mb_per_sec))
|
||||
, _static_shares(std::move(cfg.static_shares))
|
||||
, _update_compaction_static_shares_action([this] { return update_static_shares(_static_shares); })
|
||||
, _compaction_static_shares_observer(_static_shares.observe(_update_compaction_static_shares_action.make_observer()))
|
||||
, _strategy_control(std::make_unique<strategy_control>(*this))
|
||||
{
|
||||
register_metrics();
|
||||
@@ -656,6 +664,9 @@ compaction_manager::compaction_manager()
|
||||
, _compaction_controller(make_compaction_controller(_compaction_sg, 1, [] () -> float { return 1.0; }))
|
||||
, _backlog_manager(_compaction_controller)
|
||||
, _available_memory(1)
|
||||
, _static_shares(utils::updateable_value<float>(0))
|
||||
, _update_compaction_static_shares_action([] { return make_ready_future<>(); })
|
||||
, _compaction_static_shares_observer(_static_shares.observe(_update_compaction_static_shares_action.make_observer()))
|
||||
, _strategy_control(std::make_unique<strategy_control>(*this))
|
||||
{
|
||||
// No metric registration because this constructor is supposed to be used only by the testing
|
||||
@@ -817,6 +828,7 @@ future<> compaction_manager::really_do_stop() {
|
||||
_compaction_submission_timer.cancel();
|
||||
co_await _compaction_controller.shutdown();
|
||||
co_await _throughput_updater.join();
|
||||
co_await _update_compaction_static_shares_action.join();
|
||||
cmlog.info("Stopped");
|
||||
}
|
||||
|
||||
|
||||
@@ -51,7 +51,7 @@ public:
|
||||
scheduling_group compaction_sched_group;
|
||||
scheduling_group maintenance_sched_group;
|
||||
size_t available_memory;
|
||||
uint64_t static_shares = 0;
|
||||
utils::updateable_value<float> static_shares = utils::updateable_value<float>(0);
|
||||
utils::updateable_value<uint32_t> throughput_mb_per_sec = utils::updateable_value<uint32_t>(0);
|
||||
};
|
||||
private:
|
||||
@@ -284,6 +284,9 @@ private:
|
||||
utils::updateable_value<uint32_t> _throughput_mbs;
|
||||
serialized_action _throughput_updater = serialized_action([this] { return update_throughput(_throughput_mbs()); });
|
||||
std::optional<utils::observer<uint32_t>> _throughput_option_observer;
|
||||
utils::updateable_value<float> _static_shares;
|
||||
serialized_action _update_compaction_static_shares_action;
|
||||
utils::observer<float> _compaction_static_shares_observer;
|
||||
|
||||
class strategy_control;
|
||||
std::unique_ptr<strategy_control> _strategy_control;
|
||||
@@ -330,6 +333,7 @@ private:
|
||||
void postpone_compaction_for_table(compaction::table_state* t);
|
||||
|
||||
future<> perform_sstable_scrub_validate_mode(compaction::table_state& t);
|
||||
future<> update_static_shares(float shares);
|
||||
|
||||
using get_candidates_func = std::function<future<std::vector<sstables::shared_sstable>>()>;
|
||||
|
||||
|
||||
@@ -279,7 +279,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"true: auto-adjust memtable shares for flush processes")
|
||||
, memtable_flush_static_shares(this, "memtable_flush_static_shares", value_status::Used, 0,
|
||||
"If set to higher than 0, ignore the controller's output and set the memtable shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity")
|
||||
, compaction_static_shares(this, "compaction_static_shares", value_status::Used, 0,
|
||||
, compaction_static_shares(this, "compaction_static_shares", liveness::LiveUpdate, value_status::Used, 0,
|
||||
"If set to higher than 0, ignore the controller's output and set the compaction shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity")
|
||||
, compaction_enforce_min_threshold(this, "compaction_enforce_min_threshold", liveness::LiveUpdate, value_status::Used, false,
|
||||
"If set to true, enforce the min_threshold option for compactions strictly. If false (default), Scylla may decide to compact even if below min_threshold")
|
||||
|
||||
@@ -94,7 +94,7 @@ inline compaction_manager::config make_compaction_manager_config(const db::confi
|
||||
.compaction_sched_group = compaction_manager::scheduling_group{dbcfg.compaction_scheduling_group, service::get_local_compaction_priority()},
|
||||
.maintenance_sched_group = compaction_manager::scheduling_group{dbcfg.streaming_scheduling_group, service::get_local_streaming_priority()},
|
||||
.available_memory = dbcfg.available_memory,
|
||||
.static_shares = cfg.compaction_static_shares(),
|
||||
.static_shares = cfg.compaction_static_shares,
|
||||
.throughput_mb_per_sec = cfg.compaction_throughput_mb_per_sec,
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user