From 74a5981dbee2e46043ceff1fecacc853e846cb72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 2 Feb 2023 07:02:45 -0500 Subject: [PATCH] reader_concurrency_semaphore: add waiters counter Use it to keep track of all permits that are currently waiting on something: admission, memory or execution. Currently we keep track of size, by adding up the result of size() of the various queues. In future patches we are going to change the queues such that they will not have constant time size anymore, move to an explicit counter in preperation to that. Another change this commit makes is to also include ready list entries in this counter. Permits in the ready list are also waiters, they wait to be executed. Soon we will have a separate wait state for this too. --- reader_concurrency_semaphore.cc | 10 +++++++++- reader_concurrency_semaphore.hh | 7 +++---- scylla-gdb.py | 4 ++-- test/boost/reader_concurrency_semaphore_test.cc | 2 +- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 06a0907099..bc06c9eecc 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -690,6 +690,7 @@ static void maybe_dump_reader_permit_diagnostics(const reader_concurrency_semaph void reader_concurrency_semaphore::expiry_handler::operator()(entry& e) noexcept { e.pr.set_exception(named_semaphore_timed_out(_semaphore._name)); + --_semaphore._stats.waiters; maybe_dump_reader_permit_diagnostics(_semaphore, "timed out"); } @@ -729,6 +730,7 @@ future<> reader_concurrency_semaphore::execution_loop() noexcept { while (!_ready_list.empty()) { auto e = _ready_list.pop(); + --_stats.waiters; try { e.func(std::move(e.permit)).forward_to(std::move(e.pr)); @@ -798,6 +800,7 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring na utils::updateable_value(std::numeric_limits::max())) {} reader_concurrency_semaphore::~reader_concurrency_semaphore() { + assert(!_stats.waiters); if (!_stats.total_permits) { // We allow destroy without stop() when the semaphore wasn't used at all yet. return; @@ -993,7 +996,7 @@ bool reader_concurrency_semaphore::all_used_permits_are_stalled() const { } std::exception_ptr reader_concurrency_semaphore::check_queue_size(std::string_view queue_name) { - if ((_wait_list.size() + _ready_list.size()) >= _max_queue_length) { + if (_stats.waiters >= _max_queue_length) { _stats.total_reads_shed_due_to_overload++; maybe_dump_reader_permit_diagnostics(*this, fmt::format("{} queue overload", queue_name)); return std::make_exception_ptr(std::runtime_error(format("{}: {} queue overload", _name, queue_name))); @@ -1018,6 +1021,7 @@ future<> reader_concurrency_semaphore::enqueue_waiter(reader_permit permit, read _wait_list.push_to_memory_queue(entry(std::move(pr), std::move(permit), std::move(func)), timeout); ++_stats.reads_enqueued_for_memory; } + ++_stats.waiters; return fut; } @@ -1125,8 +1129,10 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept { } if (x.func) { _ready_list.push(std::move(x)); + // permit is just transferred to another queue, no need to update waiters counter } else { x.pr.set_value(); + --_stats.waiters; } } catch (...) { x.pr.set_exception(std::current_exception()); @@ -1237,6 +1243,7 @@ future<> reader_concurrency_semaphore::with_ready_permit(reader_permit permit, r promise<> pr; auto fut = pr.get_future(); _ready_list.push(entry(std::move(pr), std::move(permit), std::move(func))); + ++_stats.waiters; return fut; } @@ -1254,6 +1261,7 @@ void reader_concurrency_semaphore::broken(std::exception_ptr ex) { while (!_wait_list.empty()) { _wait_list.front().pr.set_exception(ex); _wait_list.pop_front(); + --_stats.waiters; } } diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 609aedb7de..9a31f83cdc 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -107,6 +107,8 @@ public: uint64_t disk_reads = 0; // The number of sstables read currently. uint64_t sstables_read = 0; + // Permits waiting on something: admission, memory or execution + uint64_t waiters = 0; }; using permit_list_type = bi::list< @@ -204,9 +206,6 @@ private: expiring_fifo _memory_queue; public: wait_queue(expiry_handler eh) : _admission_queue(eh), _memory_queue(eh) { } - size_t size() const { - return _admission_queue.size() + _memory_queue.size(); - } bool empty() const { return _admission_queue.empty() && _memory_queue.empty(); } @@ -508,7 +507,7 @@ public: } size_t waiters() const { - return _wait_list.size(); + return _stats.waiters; } void broken(std::exception_ptr ex = {}); diff --git a/scylla-gdb.py b/scylla-gdb.py index 3afcb03a42..079ee9e888 100755 --- a/scylla-gdb.py +++ b/scylla-gdb.py @@ -2043,7 +2043,7 @@ class scylla_memory(gdb.Command): used_count = initial_count - int(semaphore["_resources"]["count"]) used_memory = initial_memory - int(semaphore["_resources"]["memory"]) try: - waiters = int(semaphore["_wait_list"]["_admission_queue"]["_size"]) + waiters = int(semaphore["_stats"]["waiters"]) except gdb.error: # 5.1 compatibility waiters = int(semaphore["_wait_list"]["_size"]) return f'{semaphore_name:<16} {used_count:>3}/{initial_count:>3}, {used_memory:>13}/{initial_memory:>13}, queued: {waiters}' @@ -5235,7 +5235,7 @@ class scylla_read_stats(gdb.Command): try: - waiters = int(semaphore["_wait_list"]["_admission_queue"]["_size"]) + waiters = int(semaphore["_stats"]["waiters"]) except gdb.error: # 5.1 compatibility waiters = int(semaphore["_wait_list"]["_size"]) diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index f1b821b92a..7b0ff8ab3d 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -1021,7 +1021,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_ // * 1 waiter // * no more count resources left auto p3_fut = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout); - BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1); + BOOST_REQUIRE_EQUAL(semaphore.waiters(), 2); // (waiters includes _ready_list entries) BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_admission, 1); BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, 1); BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, 0);