reader_concurrency_semaphore: add cpu_concurrency constructor parameter

In the case of the user semaphore, this receives the new
reader_concurrency_semaphore_cpu_limit config item.
Not used yet.
This commit is contained in:
Botond Dénes
2024-05-29 07:58:41 -04:00
parent c7317be09a
commit 59faa6d4ff
4 changed files with 24 additions and 8 deletions

View File

@@ -983,15 +983,23 @@ void reader_concurrency_semaphore::signal(const resources& r) noexcept {
namespace sm = seastar::metrics;
static const sm::label class_label("class");
reader_concurrency_semaphore::reader_concurrency_semaphore(utils::updateable_value<int> count, ssize_t memory, sstring name, size_t max_queue_length,
utils::updateable_value<uint32_t> serialize_limit_multiplier, utils::updateable_value<uint32_t> kill_limit_multiplier, register_metrics metrics)
: _initial_resources(count(), memory)
, _resources(count(), memory)
reader_concurrency_semaphore::reader_concurrency_semaphore(
utils::updateable_value<int> count,
ssize_t memory,
sstring name,
size_t max_queue_length,
utils::updateable_value<uint32_t> serialize_limit_multiplier,
utils::updateable_value<uint32_t> kill_limit_multiplier,
utils::updateable_value<uint32_t> cpu_concurrency,
register_metrics metrics)
: _initial_resources(count, memory)
, _resources(count, memory)
, _count_observer(count.observe([this] (const int& new_count) { set_resources({new_count, _initial_resources.memory}); }))
, _name(std::move(name))
, _max_queue_length(max_queue_length)
, _serialize_limit_multiplier(std::move(serialize_limit_multiplier))
, _kill_limit_multiplier(std::move(kill_limit_multiplier))
, _cpu_concurrency(cpu_concurrency)
{
if (metrics == register_metrics::yes) {
_metrics.emplace();
@@ -1056,6 +1064,7 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring na
std::numeric_limits<size_t>::max(),
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(uint32_t(1)),
metrics) {}
reader_concurrency_semaphore::~reader_concurrency_semaphore() {

View File

@@ -189,6 +189,7 @@ private:
size_t _max_queue_length = std::numeric_limits<size_t>::max();
utils::updateable_value<uint32_t> _serialize_limit_multiplier;
utils::updateable_value<uint32_t> _kill_limit_multiplier;
utils::updateable_value<uint32_t> _cpu_concurrency;
stats _stats;
std::optional<seastar::metrics::metric_groups> _metrics;
bool _stopped = false;
@@ -281,6 +282,7 @@ public:
size_t max_queue_length,
utils::updateable_value<uint32_t> serialize_limit_multiplier,
utils::updateable_value<uint32_t> kill_limit_multiplier,
utils::updateable_value<uint32_t> cpu_concurrency,
register_metrics metrics);
reader_concurrency_semaphore(
@@ -292,7 +294,7 @@ public:
utils::updateable_value<uint32_t> kill_limit_multiplier,
register_metrics metrics)
: reader_concurrency_semaphore(utils::updateable_value(count), memory, std::move(name), max_queue_length,
std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier), metrics)
std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier), utils::updateable_value<uint32_t>(1), metrics)
{ }
/// Create a semaphore with practically unlimited count and memory.
@@ -311,9 +313,10 @@ public:
size_t max_queue_length = std::numeric_limits<size_t>::max(),
utils::updateable_value<uint32_t> serialize_limit_multipler = utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value<uint32_t> kill_limit_multipler = utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value<uint32_t> cpu_concurrency = utils::updateable_value<uint32_t>(1),
register_metrics metrics = register_metrics::no)
: reader_concurrency_semaphore(utils::updateable_value<uint32_t>(count), memory, std::move(name), max_queue_length, std::move(serialize_limit_multipler),
std::move(kill_limit_multipler), register_metrics::no)
: reader_concurrency_semaphore(utils::updateable_value(count), memory, std::move(name), max_queue_length, std::move(serialize_limit_multipler),
std::move(kill_limit_multipler), std::move(cpu_concurrency), register_metrics::no)
{}
virtual ~reader_concurrency_semaphore();

View File

@@ -330,12 +330,14 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
}
return backlog;
}))
, _read_concurrency_sem(max_count_concurrent_reads,
, _read_concurrency_sem(
utils::updateable_value<int>(max_count_concurrent_reads),
max_memory_concurrent_reads(),
"user",
max_inactive_queue_length(),
_cfg.reader_concurrency_semaphore_serialize_limit_multiplier,
_cfg.reader_concurrency_semaphore_kill_limit_multiplier,
_cfg.reader_concurrency_semaphore_cpu_concurrency,
reader_concurrency_semaphore::register_metrics::yes)
// No timeouts or queue length limits - a failure here can kill an entire repair.
// Trust the caller to limit concurrency.
@@ -346,6 +348,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
std::numeric_limits<size_t>::max(),
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(uint32_t(1)),
reader_concurrency_semaphore::register_metrics::yes)
// No limits, just for accounting.
, _compaction_concurrency_sem(reader_concurrency_semaphore::no_limits{}, "compaction", reader_concurrency_semaphore::register_metrics::no)

View File

@@ -2000,6 +2000,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_count) {
100,
utils::updateable_value<uint32_t>(serialize_multiplier),
utils::updateable_value<uint32_t>(kill_multiplier),
utils::updateable_value<uint32_t>(1),
reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);