Files
scylladb/reader_concurrency_semaphore_group.cc
Kefu Chai 569f8e9246 treewide: fix misspellings
these misspellings were identified by codespell. let's fix them.

Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>

Closes scylladb/scylladb#22154
2025-01-05 16:13:09 +02:00

113 lines
4.0 KiB
C++

/*
* Copyright (C) 2021-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "reader_concurrency_semaphore_group.hh"
// Calling adjust is serialized since 2 adjustments can't happen simultaneously,
// if they did the behaviour would be undefined.
future<> reader_concurrency_semaphore_group::adjust() {
return with_semaphore(_operations_serializer, 1, [this] () {
ssize_t distributed_memory = 0;
for (auto& [sg, wsem] : _semaphores) {
const ssize_t memory_share = std::floor((double(wsem.weight) / double(_total_weight)) * _total_memory);
wsem.sem.set_resources({_max_concurrent_reads, memory_share});
distributed_memory += memory_share;
}
// Slap the remainder on one of the semaphores.
// This will be a few bytes, doesn't matter where we add it.
auto& sem = _semaphores.begin()->second.sem;
sem.set_resources(sem.initial_resources() + reader_resources{0, _total_memory - distributed_memory});
});
}
// The call to change_weight is serialized as a consequence of the call to adjust.
future<> reader_concurrency_semaphore_group::change_weight(weighted_reader_concurrency_semaphore& sem, size_t new_weight) {
auto diff = new_weight - sem.weight;
if (diff) {
sem.weight += diff;
_total_weight += diff;
return adjust();
}
return make_ready_future<>();
}
future<> reader_concurrency_semaphore_group::wait_adjust_complete() {
return with_semaphore(_operations_serializer, 1, [] {
return make_ready_future<>();
});
}
future<> reader_concurrency_semaphore_group::stop() noexcept {
return parallel_for_each(_semaphores, [] (auto&& item) {
return item.second.sem.stop();
}).then([this] {
_semaphores.clear();
});
}
reader_concurrency_semaphore& reader_concurrency_semaphore_group::get(scheduling_group sg) {
return _semaphores.at(sg).sem;
}
reader_concurrency_semaphore* reader_concurrency_semaphore_group::get_or_null(scheduling_group sg) {
auto it = _semaphores.find(sg);
if (it == _semaphores.end()) {
return nullptr;
} else {
return &(it->second.sem);
}
}
reader_concurrency_semaphore& reader_concurrency_semaphore_group::add_or_update(scheduling_group sg, size_t shares) {
auto result = _semaphores.try_emplace(
sg,
0,
_max_concurrent_reads,
_name_prefix ? format("{}_{}", *_name_prefix, sg.name()) : sg.name(),
_max_queue_length,
_serialize_limit_multiplier,
_kill_limit_multiplier,
_cpu_concurrency
);
auto&& it = result.first;
// since we serialize all group changes this change wait will be queues and no further operations
// will be executed until this adjustment ends.
(void)change_weight(it->second, shares);
return it->second.sem;
}
future<> reader_concurrency_semaphore_group::remove(scheduling_group sg) {
auto node_handle = _semaphores.extract(sg);
if (!node_handle.empty()) {
weighted_reader_concurrency_semaphore& sem = node_handle.mapped();
return sem.sem.stop().then([this, &sem] {
return change_weight(sem, 0);
}).finally([node_handle = std::move(node_handle)] () {
// this holds on to the node handle until we destroy it only after the semaphore
// is stopped properly.
});
}
return make_ready_future();
}
size_t reader_concurrency_semaphore_group::size() {
return _semaphores.size();
}
void reader_concurrency_semaphore_group::foreach_semaphore(std::function<void(scheduling_group, reader_concurrency_semaphore&)> func) {
for (auto& [sg, wsem] : _semaphores) {
func(sg, wsem.sem);
}
}
future<>
reader_concurrency_semaphore_group::foreach_semaphore_async(std::function<future<> (scheduling_group, reader_concurrency_semaphore&)> func) {
auto units = co_await get_units(_operations_serializer, 1);
for (auto& [sg, wsem] : _semaphores) {
co_await func(sg, wsem.sem);
}
}