diff --git a/backlog_controller.hh b/backlog_controller.hh index 2a0ef26eb7..ec7eaafa19 100644 --- a/backlog_controller.hh +++ b/backlog_controller.hh @@ -61,30 +61,31 @@ protected: // updating shares for an I/O class may contact another shard and returns a future. future<> _inflight_update; + // Used when the controllers are disabled and a static share is used + // When that option is deprecated we should remove this. + float _static_shares; + virtual void update_controller(float quota); + bool controller_disabled() const noexcept { + return _static_shares > 0; + } + void adjust(); backlog_controller(scheduling_group& sg, std::chrono::milliseconds interval, - std::vector control_points, std::function backlog) + std::vector control_points, std::function backlog, + float static_shares = 0) : _scheduling_group(sg) , _interval(interval) , _update_timer([this] { adjust(); }) , _control_points() , _current_backlog(std::move(backlog)) , _inflight_update(make_ready_future<>()) + , _static_shares(static_shares) { _control_points.insert(_control_points.end(), control_points.begin(), control_points.end()); - _update_timer.arm_periodic(_interval); - } - - // Used when the controllers are disabled and a static share is used - // When that option is deprecated we should remove this. - backlog_controller(scheduling_group& sg, float static_shares) - : _scheduling_group(sg) - , _inflight_update(make_ready_future<>()) - { - update_controller(static_shares); + _update_timer.arm_periodic(_interval); } virtual ~backlog_controller() {} @@ -111,11 +112,11 @@ public: class flush_controller : public backlog_controller { static constexpr float hard_dirty_limit = 1.0f; public: - flush_controller(backlog_controller::scheduling_group& sg, float static_shares) : backlog_controller(sg, static_shares) {} - flush_controller(backlog_controller::scheduling_group& sg, std::chrono::milliseconds interval, float soft_limit, std::function current_dirty) + flush_controller(backlog_controller::scheduling_group& sg, float static_shares, std::chrono::milliseconds interval, float soft_limit, std::function current_dirty) : backlog_controller(sg, std::move(interval), std::vector({{0.0, 0.0}, {soft_limit, 10}, {soft_limit + (hard_dirty_limit - soft_limit) / 2, 200} , {hard_dirty_limit, 1000}}), - std::move(current_dirty) + std::move(current_dirty), + static_shares ) {} }; @@ -125,11 +126,11 @@ public: static constexpr unsigned normalization_factor = 30; static constexpr float disable_backlog = std::numeric_limits::infinity(); static constexpr float backlog_disabled(float backlog) { return std::isinf(backlog); } - compaction_controller(backlog_controller::scheduling_group& sg, float static_shares) : backlog_controller(sg, static_shares) {} - compaction_controller(backlog_controller::scheduling_group& sg, std::chrono::milliseconds interval, std::function current_backlog) + compaction_controller(backlog_controller::scheduling_group& sg, float static_shares, std::chrono::milliseconds interval, std::function current_backlog) : backlog_controller(sg, std::move(interval), std::vector({{0.0, 50}, {1.5, 100} , {normalization_factor, 1000}}), - std::move(current_backlog) + std::move(current_backlog), + static_shares ) {} }; diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 0aa3d6c8bd..3970d503b2 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -505,11 +505,7 @@ std::ostream& operator<<(std::ostream& os, const compaction_manager::task& task) } inline compaction_controller make_compaction_controller(compaction_manager::scheduling_group& csg, uint64_t static_shares, std::function fn) { - if (static_shares > 0) { - return compaction_controller(csg, static_shares); - } - - return compaction_controller(csg, 250ms, std::move(fn)); + return compaction_controller(csg, static_shares, 250ms, std::move(fn)); } std::string compaction_manager::task::describe() const { @@ -657,7 +653,7 @@ compaction_manager::compaction_manager(config cfg, abort_source& as) compaction_manager::compaction_manager() : _compaction_sg(scheduling_group{default_scheduling_group(), default_priority_class()}) , _maintenance_sg(scheduling_group{default_scheduling_group(), default_priority_class()}) - , _compaction_controller(_compaction_sg, 1) + , _compaction_controller(make_compaction_controller(_compaction_sg, 1, [] () -> float { return 1.0; })) , _backlog_manager(_compaction_controller) , _available_memory(1) , _strategy_control(std::make_unique(*this)) diff --git a/replica/database.cc b/replica/database.cc index 15d3f0051d..8b808d8604 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -86,10 +86,7 @@ namespace replica { inline flush_controller make_flush_controller(const db::config& cfg, backlog_controller::scheduling_group& sg, std::function fn) { - if (cfg.memtable_flush_static_shares() > 0) { - return flush_controller(sg, cfg.memtable_flush_static_shares()); - } - return flush_controller(sg, 50ms, cfg.virtual_dirty_soft_limit(), std::move(fn)); + return flush_controller(sg, cfg.memtable_flush_static_shares(), 50ms, cfg.virtual_dirty_soft_limit(), std::move(fn)); } inline compaction_manager::config make_compaction_manager_config(const db::config& cfg, database_config& dbcfg) { @@ -405,6 +402,11 @@ const data_dictionary::user_types_storage& database::user_types() const noexcept } // namespace replica void backlog_controller::adjust() { + if (controller_disabled()) { + update_controller(_static_shares); + return; + } + auto backlog = _current_backlog(); if (backlog >= _control_points.back().input) { @@ -427,8 +429,7 @@ void backlog_controller::adjust() { float backlog_controller::backlog_of_shares(float shares) const { size_t idx = 1; - // No control points means the controller is disabled. - if (_control_points.size() == 0) { + if (controller_disabled() || _control_points.size() == 0) { return 1.0f; } while ((idx < _control_points.size() - 1) && (_control_points[idx].output < shares)) {