reader_concurrency_semaphore: add waiters counter

Use it to keep track of all permits that are currently waiting on
something: admission, memory or execution.
Currently we keep track of size, by adding up the result of size() of
the various queues. In future patches we are going to change the queues
such that they will not have constant time size anymore, move to an
explicit counter in preperation to that.
Another change this commit makes is to also include ready list entries
in this counter. Permits in the ready list are also waiters, they wait
to be executed. Soon we will have a separate wait state for this too.
This commit is contained in:
Botond Dénes
2023-02-02 07:02:45 -05:00
parent 2694aa1078
commit 74a5981dbe
4 changed files with 15 additions and 8 deletions

View File

@@ -690,6 +690,7 @@ static void maybe_dump_reader_permit_diagnostics(const reader_concurrency_semaph
void reader_concurrency_semaphore::expiry_handler::operator()(entry& e) noexcept {
e.pr.set_exception(named_semaphore_timed_out(_semaphore._name));
--_semaphore._stats.waiters;
maybe_dump_reader_permit_diagnostics(_semaphore, "timed out");
}
@@ -729,6 +730,7 @@ future<> reader_concurrency_semaphore::execution_loop() noexcept {
while (!_ready_list.empty()) {
auto e = _ready_list.pop();
--_stats.waiters;
try {
e.func(std::move(e.permit)).forward_to(std::move(e.pr));
@@ -798,6 +800,7 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring na
utils::updateable_value(std::numeric_limits<uint32_t>::max())) {}
reader_concurrency_semaphore::~reader_concurrency_semaphore() {
assert(!_stats.waiters);
if (!_stats.total_permits) {
// We allow destroy without stop() when the semaphore wasn't used at all yet.
return;
@@ -993,7 +996,7 @@ bool reader_concurrency_semaphore::all_used_permits_are_stalled() const {
}
std::exception_ptr reader_concurrency_semaphore::check_queue_size(std::string_view queue_name) {
if ((_wait_list.size() + _ready_list.size()) >= _max_queue_length) {
if (_stats.waiters >= _max_queue_length) {
_stats.total_reads_shed_due_to_overload++;
maybe_dump_reader_permit_diagnostics(*this, fmt::format("{} queue overload", queue_name));
return std::make_exception_ptr(std::runtime_error(format("{}: {} queue overload", _name, queue_name)));
@@ -1018,6 +1021,7 @@ future<> reader_concurrency_semaphore::enqueue_waiter(reader_permit permit, read
_wait_list.push_to_memory_queue(entry(std::move(pr), std::move(permit), std::move(func)), timeout);
++_stats.reads_enqueued_for_memory;
}
++_stats.waiters;
return fut;
}
@@ -1125,8 +1129,10 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept {
}
if (x.func) {
_ready_list.push(std::move(x));
// permit is just transferred to another queue, no need to update waiters counter
} else {
x.pr.set_value();
--_stats.waiters;
}
} catch (...) {
x.pr.set_exception(std::current_exception());
@@ -1237,6 +1243,7 @@ future<> reader_concurrency_semaphore::with_ready_permit(reader_permit permit, r
promise<> pr;
auto fut = pr.get_future();
_ready_list.push(entry(std::move(pr), std::move(permit), std::move(func)));
++_stats.waiters;
return fut;
}
@@ -1254,6 +1261,7 @@ void reader_concurrency_semaphore::broken(std::exception_ptr ex) {
while (!_wait_list.empty()) {
_wait_list.front().pr.set_exception(ex);
_wait_list.pop_front();
--_stats.waiters;
}
}

View File

@@ -107,6 +107,8 @@ public:
uint64_t disk_reads = 0;
// The number of sstables read currently.
uint64_t sstables_read = 0;
// Permits waiting on something: admission, memory or execution
uint64_t waiters = 0;
};
using permit_list_type = bi::list<
@@ -204,9 +206,6 @@ private:
expiring_fifo<entry, expiry_handler, db::timeout_clock> _memory_queue;
public:
wait_queue(expiry_handler eh) : _admission_queue(eh), _memory_queue(eh) { }
size_t size() const {
return _admission_queue.size() + _memory_queue.size();
}
bool empty() const {
return _admission_queue.empty() && _memory_queue.empty();
}
@@ -508,7 +507,7 @@ public:
}
size_t waiters() const {
return _wait_list.size();
return _stats.waiters;
}
void broken(std::exception_ptr ex = {});

View File

@@ -2043,7 +2043,7 @@ class scylla_memory(gdb.Command):
used_count = initial_count - int(semaphore["_resources"]["count"])
used_memory = initial_memory - int(semaphore["_resources"]["memory"])
try:
waiters = int(semaphore["_wait_list"]["_admission_queue"]["_size"])
waiters = int(semaphore["_stats"]["waiters"])
except gdb.error: # 5.1 compatibility
waiters = int(semaphore["_wait_list"]["_size"])
return f'{semaphore_name:<16} {used_count:>3}/{initial_count:>3}, {used_memory:>13}/{initial_memory:>13}, queued: {waiters}'
@@ -5235,7 +5235,7 @@ class scylla_read_stats(gdb.Command):
try:
waiters = int(semaphore["_wait_list"]["_admission_queue"]["_size"])
waiters = int(semaphore["_stats"]["waiters"])
except gdb.error: # 5.1 compatibility
waiters = int(semaphore["_wait_list"]["_size"])

View File

@@ -1021,7 +1021,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_
// * 1 waiter
// * no more count resources left
auto p3_fut = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout);
BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1);
BOOST_REQUIRE_EQUAL(semaphore.waiters(), 2); // (waiters includes _ready_list entries)
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_admission, 1);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, 1);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, 0);