diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 80de31d94d..9c5b59ba2b 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -858,35 +858,58 @@ future<> reader_concurrency_semaphore::enqueue_waiter(reader_permit permit, read } void reader_concurrency_semaphore::evict_readers_in_background() { + if (_evicting) { + return; + } + _evicting = true; // Evict inactive readers in the background while wait list isn't empty // This is safe since stop() closes _gate; (void)with_gate(_close_readers_gate, [this] { return do_until([this] { return _wait_list.empty() || _inactive_reads.empty(); }, [this] { return detach_inactive_reader(_inactive_reads.front(), evict_reason::permit).close(); }); - }); - } + }).finally([this] { _evicting = false; }); +} + +reader_concurrency_semaphore::can_admit +reader_concurrency_semaphore::can_admit_read(const reader_permit& permit, require_empty_waitlist wait_list_empty) const noexcept { + if (wait_list_empty && !_wait_list.empty()) { + return can_admit::no; + } + + if (!_ready_list.empty()) { + return can_admit::no; + } + + if (!all_used_permits_are_stalled()) { + return can_admit::no; + } + + if (!has_available_units(permit.base_resources())) { + if (_inactive_reads.empty()) { + return can_admit::no; + } else { + return can_admit::maybe; + } + } + + return can_admit::yes; +} future<> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, read_func func) { if (!_execution_loop_future) { _execution_loop_future.emplace(execution_loop()); } - if (!_wait_list.empty() || !_ready_list.empty()) { - return enqueue_waiter(std::move(permit), std::move(func)); - } - if (!has_available_units(permit.base_resources())) { + const auto admit = can_admit_read(permit, require_empty_waitlist::yes); + if (admit != can_admit::yes) { auto fut = enqueue_waiter(std::move(permit), std::move(func)); - if (!_inactive_reads.empty()) { + if (admit == can_admit::maybe) { evict_readers_in_background(); } return fut; } - if (!all_used_permits_are_stalled()) { - return enqueue_waiter(std::move(permit), std::move(func)); - } - permit.on_admission(); ++_stats.reads_admitted; if (func) { @@ -896,7 +919,8 @@ future<> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, r } void reader_concurrency_semaphore::maybe_admit_waiters() noexcept { - while (!_wait_list.empty() && _ready_list.empty() && has_available_units(_wait_list.front().permit.base_resources()) && all_used_permits_are_stalled()) { + auto admit = can_admit::no; + while (!_wait_list.empty() && (admit = can_admit_read(_wait_list.front().permit, require_empty_waitlist::no)) == can_admit::yes) { auto& x = _wait_list.front(); try { x.permit.on_admission(); @@ -911,6 +935,10 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept { } _wait_list.pop_front(); } + if (admit == can_admit::maybe) { + // Evicting readers will trigger another call to `maybe_admit_waiters()` from `signal()`. + evict_readers_in_background(); + } } void reader_concurrency_semaphore::on_permit_created(reader_permit::impl& permit) { diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index a8a04770b0..123507c89d 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -181,6 +181,7 @@ private: stats _stats; permit_list_type _permit_list; bool _stopped = false; + bool _evicting = false; gate _close_readers_gate; gate _permit_gate; std::optional> _execution_loop_future; @@ -201,6 +202,15 @@ private: future<> enqueue_waiter(reader_permit permit, read_func func); void evict_readers_in_background(); future<> do_wait_admission(reader_permit permit, read_func func = {}); + + // Check whether permit can be admitted or not. + // Caller can specify whether wait list should be empty or not for admission + // to be possible. can_admit::maybe means admission might be possible if some + // of the inactive readers are evicted. + enum class can_admit { no, maybe, yes }; + using require_empty_waitlist = bool_class; + can_admit can_admit_read(const reader_permit& permit, require_empty_waitlist wait_list_empty) const noexcept; + void maybe_admit_waiters() noexcept; void on_permit_created(reader_permit::impl&); diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index f5cedfec0c..780f3d62dc 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -946,3 +946,82 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_ handles.clear(); } } + +// Reproduces https://github.com/scylladb/scylladb/issues/11770 +SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_when_all_is_blocked) { + simple_schema ss; + const auto& s = *ss.schema(); + + const auto initial_resources = reader_concurrency_semaphore::resources{2, 32 * 1024}; + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory); + auto stop_sem = deferred_stop(semaphore); + + class read { + reader_permit _permit; + promise<> _read_started_pr; + future<> _read_started_fut; + promise<> _read_done_pr; + reader_permit::used_guard _ug; + std::optional _bg; + + public: + explicit read(reader_permit p) : _permit(std::move(p)), _read_started_fut(_read_started_pr.get_future()), _ug(_permit) { } + future<> wait_read_started() { return std::move(_read_started_fut); } + void set_read_done() { _read_done_pr.set_value(); } + void mark_as_blocked() { _bg.emplace(_permit); } + void mark_as_unblocked() { _bg.reset(); } + reader_concurrency_semaphore::read_func get_read_func() { + return [this] (reader_permit permit) -> future<> { + _read_started_pr.set_value(); + co_await _read_done_pr.get_future(); + }; + } + }; + + auto p1 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout).get(); + auto irh1 = semaphore.register_inactive_read(make_empty_flat_reader_v2(ss.schema(), p1)); + + auto p2 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout).get(); + read rd2(p2); + auto fut2 = semaphore.with_ready_permit(p2, rd2.get_read_func()); + + // At this point we expect to have: + // * 1 inactive read (not evicted) + // * 1 used (but not blocked) read on the ready list + // * 1 waiter + // * no more count resources left + auto p3_fut = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout); + BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued, 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, 0); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0); + BOOST_REQUIRE(irh1); + + // Start the read emptying the ready list, this should not be enough to admit p3 + rd2.wait_read_started().get(); + BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, 0); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0); + BOOST_REQUIRE(irh1); + + // Marking p2 as blocked should now allow p3 to be admitted by evicting p1 + rd2.mark_as_blocked(); + BOOST_REQUIRE_EQUAL(semaphore.waiters(), 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, 1); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0); + BOOST_REQUIRE(!irh1); + + p3_fut.get(); + rd2.mark_as_unblocked(); + rd2.set_read_done(); + fut2.get(); +}