compaction_manager: Add compaction throughput limit
Re-use eisting compaction_throughput_mb_per_sec option, push it down to compaction manager via config and update the nderlying compaction sched class when the option is (live)updated. Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -615,9 +615,17 @@ compaction_manager::compaction_manager(config cfg, abort_source& as)
|
||||
, _early_abort_subscription(as.subscribe([this] () noexcept {
|
||||
do_stop();
|
||||
}))
|
||||
, _throughput_mbs(std::move(cfg.throughput_mb_per_sec))
|
||||
, _strategy_control(std::make_unique<strategy_control>(*this))
|
||||
{
|
||||
register_metrics();
|
||||
// Bandwidth throttling is node-wide, updater is needed on single shard
|
||||
if (this_shard_id() == 0) {
|
||||
_throughput_option_observer.emplace(_throughput_mbs.observe(_throughput_updater.make_observer()));
|
||||
// Start throttling (if configured) right at once. Any boot-time compaction
|
||||
// jobs (reshape/reshard) run in unlimited streaming group
|
||||
(void)_throughput_updater.trigger_later();
|
||||
}
|
||||
}
|
||||
|
||||
compaction_manager::compaction_manager()
|
||||
@@ -638,6 +646,19 @@ compaction_manager::~compaction_manager() {
|
||||
assert(_state == state::none || _state == state::stopped);
|
||||
}
|
||||
|
||||
future<> compaction_manager::update_throughput(uint32_t value_mbs) {
|
||||
uint64_t bps = ((uint64_t)(value_mbs != 0 ? value_mbs : std::numeric_limits<uint32_t>::max())) << 20;
|
||||
return _compaction_sg.io.update_bandwidth(bps).then_wrapped([value_mbs] (auto f) {
|
||||
if (f.failed()) {
|
||||
cmlog.warn("Couldn't update compaction bandwidth: {}", f.get_exception());
|
||||
} else if (value_mbs != 0) {
|
||||
cmlog.info("Set compaction bandwidth to {}MB/s", value_mbs);
|
||||
} else {
|
||||
cmlog.info("Set unlimited compaction bandwidth");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void compaction_manager::register_metrics() {
|
||||
namespace sm = seastar::metrics;
|
||||
|
||||
@@ -773,6 +794,7 @@ future<> compaction_manager::really_do_stop() {
|
||||
_weight_tracker.clear();
|
||||
_compaction_submission_timer.cancel();
|
||||
co_await _compaction_controller.shutdown();
|
||||
co_await _throughput_updater.join();
|
||||
cmlog.info("Stopped");
|
||||
}
|
||||
|
||||
|
||||
@@ -20,6 +20,8 @@
|
||||
#include <seastar/core/condition-variable.hh>
|
||||
#include "log.hh"
|
||||
#include "utils/exponential_backoff_retry.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
#include "utils/serialized_action.hh"
|
||||
#include <vector>
|
||||
#include <list>
|
||||
#include <functional>
|
||||
@@ -54,6 +56,7 @@ public:
|
||||
scheduling_group maintenance_sched_group;
|
||||
size_t available_memory;
|
||||
uint64_t static_shares = 0;
|
||||
utils::updateable_value<uint32_t> throughput_mb_per_sec = utils::updateable_value<uint32_t>(0);
|
||||
};
|
||||
private:
|
||||
struct compaction_state {
|
||||
@@ -289,6 +292,9 @@ private:
|
||||
compaction_backlog_manager _backlog_manager;
|
||||
size_t _available_memory;
|
||||
optimized_optional<abort_source::subscription> _early_abort_subscription;
|
||||
utils::updateable_value<uint32_t> _throughput_mbs;
|
||||
serialized_action _throughput_updater = serialized_action([this] { return update_throughput(_throughput_mbs()); });
|
||||
std::optional<utils::observer<uint32_t>> _throughput_option_observer;
|
||||
|
||||
class strategy_control;
|
||||
std::unique_ptr<strategy_control> _strategy_control;
|
||||
@@ -296,6 +302,7 @@ private:
|
||||
future<> perform_task(shared_ptr<task>);
|
||||
|
||||
future<> stop_tasks(std::vector<shared_ptr<task>> tasks, sstring reason);
|
||||
future<> update_throughput(uint32_t value_mbs);
|
||||
|
||||
// Return the largest fan-in of currently running compactions
|
||||
unsigned current_compaction_fan_in_threshold() const;
|
||||
|
||||
@@ -96,6 +96,7 @@ inline compaction_manager::config make_compaction_manager_config(const db::confi
|
||||
.maintenance_sched_group = compaction_manager::scheduling_group{dbcfg.streaming_scheduling_group, service::get_local_streaming_priority()},
|
||||
.available_memory = dbcfg.available_memory,
|
||||
.static_shares = cfg.compaction_static_shares(),
|
||||
.throughput_mb_per_sec = cfg.compaction_throughput_mb_per_sec,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user