compaction_manager: Add compaction throughput limit

Re-use eisting compaction_throughput_mb_per_sec option, push it down to
compaction manager via config and update the nderlying compaction sched
class when the option is (live)updated.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2022-06-20 15:25:06 +03:00
parent b86d11cf67
commit b112a98318
3 changed files with 30 additions and 0 deletions

View File

@@ -615,9 +615,17 @@ compaction_manager::compaction_manager(config cfg, abort_source& as)
, _early_abort_subscription(as.subscribe([this] () noexcept {
do_stop();
}))
, _throughput_mbs(std::move(cfg.throughput_mb_per_sec))
, _strategy_control(std::make_unique<strategy_control>(*this))
{
register_metrics();
// Bandwidth throttling is node-wide, updater is needed on single shard
if (this_shard_id() == 0) {
_throughput_option_observer.emplace(_throughput_mbs.observe(_throughput_updater.make_observer()));
// Start throttling (if configured) right at once. Any boot-time compaction
// jobs (reshape/reshard) run in unlimited streaming group
(void)_throughput_updater.trigger_later();
}
}
compaction_manager::compaction_manager()
@@ -638,6 +646,19 @@ compaction_manager::~compaction_manager() {
assert(_state == state::none || _state == state::stopped);
}
future<> compaction_manager::update_throughput(uint32_t value_mbs) {
uint64_t bps = ((uint64_t)(value_mbs != 0 ? value_mbs : std::numeric_limits<uint32_t>::max())) << 20;
return _compaction_sg.io.update_bandwidth(bps).then_wrapped([value_mbs] (auto f) {
if (f.failed()) {
cmlog.warn("Couldn't update compaction bandwidth: {}", f.get_exception());
} else if (value_mbs != 0) {
cmlog.info("Set compaction bandwidth to {}MB/s", value_mbs);
} else {
cmlog.info("Set unlimited compaction bandwidth");
}
});
}
void compaction_manager::register_metrics() {
namespace sm = seastar::metrics;
@@ -773,6 +794,7 @@ future<> compaction_manager::really_do_stop() {
_weight_tracker.clear();
_compaction_submission_timer.cancel();
co_await _compaction_controller.shutdown();
co_await _throughput_updater.join();
cmlog.info("Stopped");
}

View File

@@ -20,6 +20,8 @@
#include <seastar/core/condition-variable.hh>
#include "log.hh"
#include "utils/exponential_backoff_retry.hh"
#include "utils/updateable_value.hh"
#include "utils/serialized_action.hh"
#include <vector>
#include <list>
#include <functional>
@@ -54,6 +56,7 @@ public:
scheduling_group maintenance_sched_group;
size_t available_memory;
uint64_t static_shares = 0;
utils::updateable_value<uint32_t> throughput_mb_per_sec = utils::updateable_value<uint32_t>(0);
};
private:
struct compaction_state {
@@ -289,6 +292,9 @@ private:
compaction_backlog_manager _backlog_manager;
size_t _available_memory;
optimized_optional<abort_source::subscription> _early_abort_subscription;
utils::updateable_value<uint32_t> _throughput_mbs;
serialized_action _throughput_updater = serialized_action([this] { return update_throughput(_throughput_mbs()); });
std::optional<utils::observer<uint32_t>> _throughput_option_observer;
class strategy_control;
std::unique_ptr<strategy_control> _strategy_control;
@@ -296,6 +302,7 @@ private:
future<> perform_task(shared_ptr<task>);
future<> stop_tasks(std::vector<shared_ptr<task>> tasks, sstring reason);
future<> update_throughput(uint32_t value_mbs);
// Return the largest fan-in of currently running compactions
unsigned current_compaction_fan_in_threshold() const;

View File

@@ -96,6 +96,7 @@ inline compaction_manager::config make_compaction_manager_config(const db::confi
.maintenance_sched_group = compaction_manager::scheduling_group{dbcfg.streaming_scheduling_group, service::get_local_streaming_priority()},
.available_memory = dbcfg.available_memory,
.static_shares = cfg.compaction_static_shares(),
.throughput_mb_per_sec = cfg.compaction_throughput_mb_per_sec,
};
}