From 0e388d2140b6863ce8c52ececd12d1df198655df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 14 Oct 2022 07:53:38 +0300 Subject: [PATCH] reader_concurrency_semaphore: unify admission logic across all paths The semaphore currently has two admission paths: the obtain_permit()/with_permit() methods which admits permits on user request (the front door) and the maybe_admit_waiters() which admits permits based on internal events like memory resource being returned (the back door). The two paths used their own admission conditions and naturally this means that they diverged in time. Notably, maybe_admit_waiters() did not look at inactive readers assuming that if there are waiters there cannot be inactive readers. This is not true however since we merged the execution-stage into the semaphore. Waiters can queue up even when there are inactive reads and thus maybe_admit_waiters() has to consider evicting some of them to see if this would allow for admitting new reads. To avoid such divergence in the future, the admission logic was moved into a new method can_admit_read() which is now shared between the two method families. This method now checks for the possibility of evicting inactive readers as well. The admission logic was tuned slightly to only consider evicting inactive readers if there is a real possibility that this will result in admissions: notably, before this patch, resource availability was checked before stalls were (used permits == blocked permits), so we could evict readers even if this couldn't help. Because now eviction can be started from maybe_admit_waiters(), which is also downstream from eviction, we added a flag to avoid recursive evict -> maybe admit -> evict ... loops. Fixes: #11770 Closes #11784 (cherry picked from commit 7fbad8de875816f0fb71ffa1965ba65543354629) --- reader_concurrency_semaphore.cc | 52 +++++++++--- reader_concurrency_semaphore.hh | 10 +++ .../reader_concurrency_semaphore_test.cc | 79 +++++++++++++++++++ 3 files changed, 129 insertions(+), 12 deletions(-) diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 80de31d94d..9c5b59ba2b 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -858,35 +858,58 @@ future<> reader_concurrency_semaphore::enqueue_waiter(reader_permit permit, read } void reader_concurrency_semaphore::evict_readers_in_background() { + if (_evicting) { + return; + } + _evicting = true; // Evict inactive readers in the background while wait list isn't empty // This is safe since stop() closes _gate; (void)with_gate(_close_readers_gate, [this] { return do_until([this] { return _wait_list.empty() || _inactive_reads.empty(); }, [this] { return detach_inactive_reader(_inactive_reads.front(), evict_reason::permit).close(); }); - }); - } + }).finally([this] { _evicting = false; }); +} + +reader_concurrency_semaphore::can_admit +reader_concurrency_semaphore::can_admit_read(const reader_permit& permit, require_empty_waitlist wait_list_empty) const noexcept { + if (wait_list_empty && !_wait_list.empty()) { + return can_admit::no; + } + + if (!_ready_list.empty()) { + return can_admit::no; + } + + if (!all_used_permits_are_stalled()) { + return can_admit::no; + } + + if (!has_available_units(permit.base_resources())) { + if (_inactive_reads.empty()) { + return can_admit::no; + } else { + return can_admit::maybe; + } + } + + return can_admit::yes; +} future<> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, read_func func) { if (!_execution_loop_future) { _execution_loop_future.emplace(execution_loop()); } - if (!_wait_list.empty() || !_ready_list.empty()) { - return enqueue_waiter(std::move(permit), std::move(func)); - } - if (!has_available_units(permit.base_resources())) { + const auto admit = can_admit_read(permit, require_empty_waitlist::yes); + if (admit != can_admit::yes) { auto fut = enqueue_waiter(std::move(permit), std::move(func)); - if (!_inactive_reads.empty()) { + if (admit == can_admit::maybe) { evict_readers_in_background(); } return fut; } - if (!all_used_permits_are_stalled()) { - return enqueue_waiter(std::move(permit), std::move(func)); - } - permit.on_admission(); ++_stats.reads_admitted; if (func) { @@ -896,7 +919,8 @@ future<> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, r } void reader_concurrency_semaphore::maybe_admit_waiters() noexcept { - while (!_wait_list.empty() && _ready_list.empty() && has_available_units(_wait_list.front().permit.base_resources()) && all_used_permits_are_stalled()) { + auto admit = can_admit::no; + while (!_wait_list.empty() && (admit = can_admit_read(_wait_list.front().permit, require_empty_waitlist::no)) == can_admit::yes) { auto& x = _wait_list.front(); try { x.permit.on_admission(); @@ -911,6 +935,10 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept { } _wait_list.pop_front(); } + if (admit == can_admit::maybe) { + // Evicting readers will trigger another call to `maybe_admit_waiters()` from `signal()`. + evict_readers_in_background(); + } } void reader_concurrency_semaphore::on_permit_created(reader_permit::impl& permit) { diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index a8a04770b0..123507c89d 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -181,6 +181,7 @@ private: stats _stats; permit_list_type _permit_list; bool _stopped = false; + bool _evicting = false; gate _close_readers_gate; gate _permit_gate; std::optional> _execution_loop_future; @@ -201,6 +202,15 @@ private: future<> enqueue_waiter(reader_permit permit, read_func func); void evict_readers_in_background(); future<> do_wait_admission(reader_permit permit, read_func func = {}); + + // Check whether permit can be admitted or not. + // Caller can specify whether wait list should be empty or not for admission + // to be possible. can_admit::maybe means admission might be possible if some + // of the inactive readers are evicted. + enum class can_admit { no, maybe, yes }; + using require_empty_waitlist = bool_class; + can_admit can_admit_read(const reader_permit& permit, require_empty_waitlist wait_list_empty) const noexcept; + void maybe_admit_waiters() noexcept; void on_permit_created(reader_permit::impl&); diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index f5cedfec0c..780f3d62dc 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -946,3 +946,82 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_ handles.clear(); } } + +// Reproduces https://github.com/scylladb/scylladb/issues/11770 +SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_when_all_is_blocked) { + simple_schema ss; + const auto& s = *ss.schema(); + + const auto initial_resources = reader_concurrency_semaphore::resources{2, 32 * 1024}; + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory); + auto stop_sem = deferred_stop(semaphore); + + class read { + reader_permit _permit; + promise<> _read_started_pr; + future<> _read_started_fut; + promise<> _read_done_pr; + reader_permit::used_guard _ug; + std::optional _bg; + + public: + explicit read(reader_permit p) : _permit(std::move(p)), _read_started_fut(_read_started_pr.get_future()), _ug(_permit) { } + future<> wait_read_started() { return std::move(_read_started_fut); } + void set_read_done() { _read_done_pr.set_value(); } + void mark_as_blocked() { _bg.emplace(_permit); } + void mark_as_unblocked() { _bg.reset(); } + reader_concurrency_semaphore::read_func get_read_func() { + return [this] (reader_permit permit) -> future<> { + _read_started_pr.set_value(); + co_await _read_done_pr.get_future(); + }; + } + }; + + auto p1 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout).get(); + auto irh1 = semaphore.register_inactive_read(make_empty_flat_reader_v2(ss.schema(), p1)); + + auto p2 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout).get(); + read rd2(p2); + auto fut2 = semaphore.with_ready_permit(p2, rd2.get_read_func()); + + // At this point we expect to have: + // * 1 inactive read (not evicted) + // * 1 used (but not blocked) read on the ready list + // * 1 waiter + // * no more count resources left + auto p3_fut = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout); + BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued, 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, 0); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0); + BOOST_REQUIRE(irh1); + + // Start the read emptying the ready list, this should not be enough to admit p3 + rd2.wait_read_started().get(); + BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, 0); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0); + BOOST_REQUIRE(irh1); + + // Marking p2 as blocked should now allow p3 to be admitted by evicting p1 + rd2.mark_as_blocked(); + BOOST_REQUIRE_EQUAL(semaphore.waiters(), 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, 1); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0); + BOOST_REQUIRE(!irh1); + + p3_fut.get(); + rd2.mark_as_unblocked(); + rd2.set_read_done(); + fut2.get(); +}