diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 06a0907099..bc06c9eecc 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -690,6 +690,7 @@ static void maybe_dump_reader_permit_diagnostics(const reader_concurrency_semaph void reader_concurrency_semaphore::expiry_handler::operator()(entry& e) noexcept { e.pr.set_exception(named_semaphore_timed_out(_semaphore._name)); + --_semaphore._stats.waiters; maybe_dump_reader_permit_diagnostics(_semaphore, "timed out"); } @@ -729,6 +730,7 @@ future<> reader_concurrency_semaphore::execution_loop() noexcept { while (!_ready_list.empty()) { auto e = _ready_list.pop(); + --_stats.waiters; try { e.func(std::move(e.permit)).forward_to(std::move(e.pr)); @@ -798,6 +800,7 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring na utils::updateable_value(std::numeric_limits::max())) {} reader_concurrency_semaphore::~reader_concurrency_semaphore() { + assert(!_stats.waiters); if (!_stats.total_permits) { // We allow destroy without stop() when the semaphore wasn't used at all yet. return; @@ -993,7 +996,7 @@ bool reader_concurrency_semaphore::all_used_permits_are_stalled() const { } std::exception_ptr reader_concurrency_semaphore::check_queue_size(std::string_view queue_name) { - if ((_wait_list.size() + _ready_list.size()) >= _max_queue_length) { + if (_stats.waiters >= _max_queue_length) { _stats.total_reads_shed_due_to_overload++; maybe_dump_reader_permit_diagnostics(*this, fmt::format("{} queue overload", queue_name)); return std::make_exception_ptr(std::runtime_error(format("{}: {} queue overload", _name, queue_name))); @@ -1018,6 +1021,7 @@ future<> reader_concurrency_semaphore::enqueue_waiter(reader_permit permit, read _wait_list.push_to_memory_queue(entry(std::move(pr), std::move(permit), std::move(func)), timeout); ++_stats.reads_enqueued_for_memory; } + ++_stats.waiters; return fut; } @@ -1125,8 +1129,10 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept { } if (x.func) { _ready_list.push(std::move(x)); + // permit is just transferred to another queue, no need to update waiters counter } else { x.pr.set_value(); + --_stats.waiters; } } catch (...) { x.pr.set_exception(std::current_exception()); @@ -1237,6 +1243,7 @@ future<> reader_concurrency_semaphore::with_ready_permit(reader_permit permit, r promise<> pr; auto fut = pr.get_future(); _ready_list.push(entry(std::move(pr), std::move(permit), std::move(func))); + ++_stats.waiters; return fut; } @@ -1254,6 +1261,7 @@ void reader_concurrency_semaphore::broken(std::exception_ptr ex) { while (!_wait_list.empty()) { _wait_list.front().pr.set_exception(ex); _wait_list.pop_front(); + --_stats.waiters; } } diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 609aedb7de..9a31f83cdc 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -107,6 +107,8 @@ public: uint64_t disk_reads = 0; // The number of sstables read currently. uint64_t sstables_read = 0; + // Permits waiting on something: admission, memory or execution + uint64_t waiters = 0; }; using permit_list_type = bi::list< @@ -204,9 +206,6 @@ private: expiring_fifo _memory_queue; public: wait_queue(expiry_handler eh) : _admission_queue(eh), _memory_queue(eh) { } - size_t size() const { - return _admission_queue.size() + _memory_queue.size(); - } bool empty() const { return _admission_queue.empty() && _memory_queue.empty(); } @@ -508,7 +507,7 @@ public: } size_t waiters() const { - return _wait_list.size(); + return _stats.waiters; } void broken(std::exception_ptr ex = {}); diff --git a/scylla-gdb.py b/scylla-gdb.py index 3afcb03a42..079ee9e888 100755 --- a/scylla-gdb.py +++ b/scylla-gdb.py @@ -2043,7 +2043,7 @@ class scylla_memory(gdb.Command): used_count = initial_count - int(semaphore["_resources"]["count"]) used_memory = initial_memory - int(semaphore["_resources"]["memory"]) try: - waiters = int(semaphore["_wait_list"]["_admission_queue"]["_size"]) + waiters = int(semaphore["_stats"]["waiters"]) except gdb.error: # 5.1 compatibility waiters = int(semaphore["_wait_list"]["_size"]) return f'{semaphore_name:<16} {used_count:>3}/{initial_count:>3}, {used_memory:>13}/{initial_memory:>13}, queued: {waiters}' @@ -5235,7 +5235,7 @@ class scylla_read_stats(gdb.Command): try: - waiters = int(semaphore["_wait_list"]["_admission_queue"]["_size"]) + waiters = int(semaphore["_stats"]["waiters"]) except gdb.error: # 5.1 compatibility waiters = int(semaphore["_wait_list"]["_size"]) diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index f1b821b92a..7b0ff8ab3d 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -1021,7 +1021,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_ // * 1 waiter // * no more count resources left auto p3_fut = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout); - BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1); + BOOST_REQUIRE_EQUAL(semaphore.waiters(), 2); // (waiters includes _ready_list entries) BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_admission, 1); BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, 1); BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, 0);