Files
scylladb/reader_concurrency_semaphore_group.hh
Kefu Chai 7ff0d7ba98 tree: Remove unused boost headers
This commit eliminates unused boost header includes from the tree.

Removing these unnecessary includes reduces dependencies on the
external Boost.Adapters library, leading to faster compile times
and a slightly cleaner codebase.

Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>

Closes scylladb/scylladb#22857
2025-02-15 20:32:22 +02:00

89 lines
3.9 KiB
C++

/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
/*
* Copyright (C) 2021-present ScyllaDB
*/
#pragma once
#include <unordered_map>
#include <optional>
#include "reader_concurrency_semaphore.hh"
// The reader_concurrency_semaphore_group is a group of semaphores that shares a common pool of memory,
// the memory is dynamically divided between them according to a relative slice of shares each semaphore
// is given.
// All of the mutating operations on the group are asynchronic and serialized. The semaphores are created
// and managed by the group.
class reader_concurrency_semaphore_group {
size_t _total_memory;
size_t _total_weight;
size_t _max_concurrent_reads;
size_t _max_queue_length;
utils::updateable_value<uint32_t> _serialize_limit_multiplier;
utils::updateable_value<uint32_t> _kill_limit_multiplier;
utils::updateable_value<uint32_t> _cpu_concurrency;
friend class database_test_wrapper;
struct weighted_reader_concurrency_semaphore {
size_t weight;
ssize_t memory_share;
reader_concurrency_semaphore sem;
weighted_reader_concurrency_semaphore(size_t shares, int count, sstring name, size_t max_queue_length,
utils::updateable_value<uint32_t> serialize_limit_multiplier,
utils::updateable_value<uint32_t> kill_limit_multiplier,
utils::updateable_value<uint32_t> cpu_concurrency)
: weight(shares)
, memory_share(0)
, sem(utils::updateable_value(count), 0, name, max_queue_length, std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier),
std::move(cpu_concurrency), reader_concurrency_semaphore::register_metrics::yes) {}
};
std::unordered_map<scheduling_group, weighted_reader_concurrency_semaphore> _semaphores;
seastar::semaphore _operations_serializer;
std::optional<sstring> _name_prefix;
future<> change_weight(weighted_reader_concurrency_semaphore& sem, size_t new_weight);
public:
reader_concurrency_semaphore_group(size_t memory, size_t max_concurrent_reads, size_t max_queue_length,
utils::updateable_value<uint32_t> serialize_limit_multiplier,
utils::updateable_value<uint32_t> kill_limit_multiplier,
utils::updateable_value<uint32_t> cpu_concurrency,
std::optional<sstring> name_prefix = std::nullopt)
: _total_memory(memory)
, _total_weight(0)
, _max_concurrent_reads(max_concurrent_reads)
, _max_queue_length(max_queue_length)
, _serialize_limit_multiplier(std::move(serialize_limit_multiplier))
, _kill_limit_multiplier(std::move(kill_limit_multiplier))
, _cpu_concurrency(std::move(cpu_concurrency))
, _operations_serializer(1)
, _name_prefix(std::move(name_prefix)) { }
~reader_concurrency_semaphore_group() {
assert(_semaphores.empty());
}
future<> adjust();
future<> wait_adjust_complete();
future<> stop() noexcept;
reader_concurrency_semaphore& get(scheduling_group sg);
reader_concurrency_semaphore* get_or_null(scheduling_group sg);
reader_concurrency_semaphore& add_or_update(scheduling_group sg, size_t shares);
future<> remove(scheduling_group sg);
size_t size();
void foreach_semaphore(std::function<void(scheduling_group, reader_concurrency_semaphore&)> func);
future<> foreach_semaphore_async(std::function<future<> (scheduling_group, reader_concurrency_semaphore&)> func);
auto sum_read_concurrency_sem_var(std::invocable<reader_concurrency_semaphore&> auto member) {
using ret_type = std::invoke_result_t<decltype(member), reader_concurrency_semaphore&>;
return std::ranges::fold_left(_semaphores | std::views::values | std::views::transform([=] (weighted_reader_concurrency_semaphore& wrcs) { return std::invoke(member, wrcs.sem); }), ret_type(0), std::plus{});
}
};