reader_concurrency_semaphore: add cpu_concurrency constructor parameter
In the case of the user semaphore, this receives the new reader_concurrency_semaphore_cpu_limit config item. Not used yet.
This commit is contained in:
@@ -983,15 +983,23 @@ void reader_concurrency_semaphore::signal(const resources& r) noexcept {
|
||||
namespace sm = seastar::metrics;
|
||||
static const sm::label class_label("class");
|
||||
|
||||
reader_concurrency_semaphore::reader_concurrency_semaphore(utils::updateable_value<int> count, ssize_t memory, sstring name, size_t max_queue_length,
|
||||
utils::updateable_value<uint32_t> serialize_limit_multiplier, utils::updateable_value<uint32_t> kill_limit_multiplier, register_metrics metrics)
|
||||
: _initial_resources(count(), memory)
|
||||
, _resources(count(), memory)
|
||||
reader_concurrency_semaphore::reader_concurrency_semaphore(
|
||||
utils::updateable_value<int> count,
|
||||
ssize_t memory,
|
||||
sstring name,
|
||||
size_t max_queue_length,
|
||||
utils::updateable_value<uint32_t> serialize_limit_multiplier,
|
||||
utils::updateable_value<uint32_t> kill_limit_multiplier,
|
||||
utils::updateable_value<uint32_t> cpu_concurrency,
|
||||
register_metrics metrics)
|
||||
: _initial_resources(count, memory)
|
||||
, _resources(count, memory)
|
||||
, _count_observer(count.observe([this] (const int& new_count) { set_resources({new_count, _initial_resources.memory}); }))
|
||||
, _name(std::move(name))
|
||||
, _max_queue_length(max_queue_length)
|
||||
, _serialize_limit_multiplier(std::move(serialize_limit_multiplier))
|
||||
, _kill_limit_multiplier(std::move(kill_limit_multiplier))
|
||||
, _cpu_concurrency(cpu_concurrency)
|
||||
{
|
||||
if (metrics == register_metrics::yes) {
|
||||
_metrics.emplace();
|
||||
@@ -1056,6 +1064,7 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring na
|
||||
std::numeric_limits<size_t>::max(),
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value(uint32_t(1)),
|
||||
metrics) {}
|
||||
|
||||
reader_concurrency_semaphore::~reader_concurrency_semaphore() {
|
||||
|
||||
@@ -189,6 +189,7 @@ private:
|
||||
size_t _max_queue_length = std::numeric_limits<size_t>::max();
|
||||
utils::updateable_value<uint32_t> _serialize_limit_multiplier;
|
||||
utils::updateable_value<uint32_t> _kill_limit_multiplier;
|
||||
utils::updateable_value<uint32_t> _cpu_concurrency;
|
||||
stats _stats;
|
||||
std::optional<seastar::metrics::metric_groups> _metrics;
|
||||
bool _stopped = false;
|
||||
@@ -281,6 +282,7 @@ public:
|
||||
size_t max_queue_length,
|
||||
utils::updateable_value<uint32_t> serialize_limit_multiplier,
|
||||
utils::updateable_value<uint32_t> kill_limit_multiplier,
|
||||
utils::updateable_value<uint32_t> cpu_concurrency,
|
||||
register_metrics metrics);
|
||||
|
||||
reader_concurrency_semaphore(
|
||||
@@ -292,7 +294,7 @@ public:
|
||||
utils::updateable_value<uint32_t> kill_limit_multiplier,
|
||||
register_metrics metrics)
|
||||
: reader_concurrency_semaphore(utils::updateable_value(count), memory, std::move(name), max_queue_length,
|
||||
std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier), metrics)
|
||||
std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier), utils::updateable_value<uint32_t>(1), metrics)
|
||||
{ }
|
||||
|
||||
/// Create a semaphore with practically unlimited count and memory.
|
||||
@@ -311,9 +313,10 @@ public:
|
||||
size_t max_queue_length = std::numeric_limits<size_t>::max(),
|
||||
utils::updateable_value<uint32_t> serialize_limit_multipler = utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value<uint32_t> kill_limit_multipler = utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value<uint32_t> cpu_concurrency = utils::updateable_value<uint32_t>(1),
|
||||
register_metrics metrics = register_metrics::no)
|
||||
: reader_concurrency_semaphore(utils::updateable_value<uint32_t>(count), memory, std::move(name), max_queue_length, std::move(serialize_limit_multipler),
|
||||
std::move(kill_limit_multipler), register_metrics::no)
|
||||
: reader_concurrency_semaphore(utils::updateable_value(count), memory, std::move(name), max_queue_length, std::move(serialize_limit_multipler),
|
||||
std::move(kill_limit_multipler), std::move(cpu_concurrency), register_metrics::no)
|
||||
{}
|
||||
|
||||
virtual ~reader_concurrency_semaphore();
|
||||
|
||||
@@ -330,12 +330,14 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
|
||||
}
|
||||
return backlog;
|
||||
}))
|
||||
, _read_concurrency_sem(max_count_concurrent_reads,
|
||||
, _read_concurrency_sem(
|
||||
utils::updateable_value<int>(max_count_concurrent_reads),
|
||||
max_memory_concurrent_reads(),
|
||||
"user",
|
||||
max_inactive_queue_length(),
|
||||
_cfg.reader_concurrency_semaphore_serialize_limit_multiplier,
|
||||
_cfg.reader_concurrency_semaphore_kill_limit_multiplier,
|
||||
_cfg.reader_concurrency_semaphore_cpu_concurrency,
|
||||
reader_concurrency_semaphore::register_metrics::yes)
|
||||
// No timeouts or queue length limits - a failure here can kill an entire repair.
|
||||
// Trust the caller to limit concurrency.
|
||||
@@ -346,6 +348,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
|
||||
std::numeric_limits<size_t>::max(),
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value(uint32_t(1)),
|
||||
reader_concurrency_semaphore::register_metrics::yes)
|
||||
// No limits, just for accounting.
|
||||
, _compaction_concurrency_sem(reader_concurrency_semaphore::no_limits{}, "compaction", reader_concurrency_semaphore::register_metrics::no)
|
||||
|
||||
@@ -2000,6 +2000,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_count) {
|
||||
100,
|
||||
utils::updateable_value<uint32_t>(serialize_multiplier),
|
||||
utils::updateable_value<uint32_t>(kill_multiplier),
|
||||
utils::updateable_value<uint32_t>(1),
|
||||
reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user