diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 9c5b59ba2b..d4d13e1ff1 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -751,8 +751,6 @@ future<> reader_concurrency_semaphore::evict_inactive_reads_for_table(utils::UUI ++it; if (ir.reader.schema()->id() == id) { do_detach_inactive_reader(ir, evict_reason::manual); - ir.ttl_timer.cancel(); - ir.unlink(); evicted_readers.push_back(ir); } } @@ -785,6 +783,8 @@ future<> reader_concurrency_semaphore::stop() noexcept { } void reader_concurrency_semaphore::do_detach_inactive_reader(inactive_read& ir, evict_reason reason) noexcept { + ir.unlink(); + ir.ttl_timer.cancel(); ir.detach(); ir.reader.permit()._impl->on_evicted(); try { @@ -865,18 +865,20 @@ void reader_concurrency_semaphore::evict_readers_in_background() { // Evict inactive readers in the background while wait list isn't empty // This is safe since stop() closes _gate; (void)with_gate(_close_readers_gate, [this] { - return do_until([this] { return _wait_list.empty() || _inactive_reads.empty(); }, [this] { - return detach_inactive_reader(_inactive_reads.front(), evict_reason::permit).close(); + return repeat([this] { + if (_wait_list.empty() || _inactive_reads.empty()) { + _evicting = false; + return make_ready_future(stop_iteration::yes); + } + return detach_inactive_reader(_inactive_reads.front(), evict_reason::permit).close().then([] { + return stop_iteration::no; + }); }); - }).finally([this] { _evicting = false; }); + }); } reader_concurrency_semaphore::can_admit -reader_concurrency_semaphore::can_admit_read(const reader_permit& permit, require_empty_waitlist wait_list_empty) const noexcept { - if (wait_list_empty && !_wait_list.empty()) { - return can_admit::no; - } - +reader_concurrency_semaphore::can_admit_read(const reader_permit& permit) const noexcept { if (!_ready_list.empty()) { return can_admit::no; } @@ -901,10 +903,17 @@ future<> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, r _execution_loop_future.emplace(execution_loop()); } - const auto admit = can_admit_read(permit, require_empty_waitlist::yes); - if (admit != can_admit::yes) { + const auto admit = can_admit_read(permit); + if (admit != can_admit::yes || !_wait_list.empty()) { auto fut = enqueue_waiter(std::move(permit), std::move(func)); - if (admit == can_admit::maybe) { + if (admit == can_admit::yes && !_wait_list.empty()) { + // This is a contradiction: the semaphore could admit waiters yet it has waiters. + // Normally, the semaphore should admit waiters as soon as it can. + // So at any point in time, there should either be no waiters, or it + // shouldn't be able to admit new reads. Otherwise something went wrong. + maybe_dump_reader_permit_diagnostics(*this, _permit_list, "semaphore could admit new reads yet there are waiters"); + maybe_admit_waiters(); + } else if (admit == can_admit::maybe) { evict_readers_in_background(); } return fut; @@ -920,7 +929,7 @@ future<> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, r void reader_concurrency_semaphore::maybe_admit_waiters() noexcept { auto admit = can_admit::no; - while (!_wait_list.empty() && (admit = can_admit_read(_wait_list.front().permit, require_empty_waitlist::no)) == can_admit::yes) { + while (!_wait_list.empty() && (admit = can_admit_read(_wait_list.front().permit)) == can_admit::yes) { auto& x = _wait_list.front(); try { x.permit.on_admission(); diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 123507c89d..60a9e036b0 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -204,12 +204,12 @@ private: future<> do_wait_admission(reader_permit permit, read_func func = {}); // Check whether permit can be admitted or not. - // Caller can specify whether wait list should be empty or not for admission - // to be possible. can_admit::maybe means admission might be possible if some - // of the inactive readers are evicted. + // The wait list is not taken into consideration, this is the caller's + // responsibility. + // A return value of can_admit::maybe means admission might be possible if + // some of the inactive readers are evicted. enum class can_admit { no, maybe, yes }; - using require_empty_waitlist = bool_class; - can_admit can_admit_read(const reader_permit& permit, require_empty_waitlist wait_list_empty) const noexcept; + can_admit can_admit_read(const reader_permit& permit) const noexcept; void maybe_admit_waiters() noexcept;