diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 83fec82d89..10907647a5 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -700,11 +700,7 @@ reader_concurrency_semaphore::~reader_concurrency_semaphore() { reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore::register_inactive_read(flat_mutation_reader_v2 reader) noexcept { auto& permit_impl = *reader.permit()._impl; permit_impl.on_register_as_inactive(); - // Implies _inactive_reads.empty(), we don't queue new readers before - // evicting all inactive reads. - // Checking the _wait_list covers the count resources only, so check memory - // separately. - if (_wait_list.empty() && _resources.memory > 0) { + if (!should_evict_inactive_read()) { try { auto irp = std::make_unique(std::move(reader)); auto& ir = *irp; @@ -902,7 +898,7 @@ void reader_concurrency_semaphore::evict_readers_in_background() { // This is safe since stop() closes _gate; (void)with_gate(_close_readers_gate, [this] { return repeat([this] { - if (_wait_list.empty() || _inactive_reads.empty()) { + if (_inactive_reads.empty() || !should_evict_inactive_read()) { _evicting = false; return make_ready_future(stop_iteration::yes); } @@ -935,6 +931,17 @@ reader_concurrency_semaphore::can_admit_read(const reader_permit& permit) const return {can_admit::yes, reason::all_ok}; } +bool reader_concurrency_semaphore::should_evict_inactive_read() const noexcept { + if (_resources.memory < 0 || _resources.count < 0) { + return true; + } + if (_wait_list.empty()) { + return false; + } + const auto r = can_admit_read(_wait_list.front().permit).why; + return r == reason::memory_resources || r == reason::count_resources; +} + future<> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, read_func func) { if (!_execution_loop_future) { _execution_loop_future.emplace(execution_loop()); diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 30c5c6b43e..c1c30ad540 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -230,6 +230,8 @@ private: struct admit_result { can_admit decision; reason why; }; admit_result can_admit_read(const reader_permit& permit) const noexcept; + bool should_evict_inactive_read() const noexcept; + void maybe_admit_waiters() noexcept; void on_permit_created(reader_permit::impl&); diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index dfacbdb4c2..6eec0cad90 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -1115,3 +1115,247 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_with_inactive_re permit = {}; stop_f.get(); } + +// Check that inactive reads are not needlessly evicted when admission is not +// blocked on resources. +// This test covers all the cases where eviction should **not** happen. +SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_no_unnecessary_evicting) { + const auto initial_resources = reader_concurrency_semaphore::resources{2, 4 * 1024}; + reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100); + auto stop_sem = deferred_stop(semaphore); + + simple_schema ss; + auto s = ss.schema(); + + auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + + // There are available resources + { + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 1); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 3 * 1024); + + auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1)); + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + + semaphore.set_resources(initial_resources); + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + + BOOST_REQUIRE(semaphore.unregister_inactive_read(std::move(handle))); + } + + // Count resources are on the limit but no one wants more + { + auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 2 * 1024); + + auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1)); + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + + semaphore.set_resources(initial_resources); + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + + BOOST_REQUIRE(semaphore.unregister_inactive_read(std::move(handle))); + } + + // Memory resources are on the limit but no one wants more + { + auto units = permit1.consume_memory(3 * 1024); + + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 1); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 0); + + auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1)); + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + BOOST_REQUIRE(semaphore.unregister_inactive_read(std::move(handle))); + } + + // Up the resource count, we need more permits to check the rest of the scenarios + semaphore.set_resources({4, 4 * 1024}); + + // There are waiters but they are not blocked on resources + { + auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + auto permit3 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + + std::optional ug1{permit1}; + std::optional ug2{permit2}; + + auto permit4_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout); + BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_queued_because_used_permits, 1); + + // First check the register path. + auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit3)); + + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + BOOST_REQUIRE_EQUAL(permit3.get_state(), reader_permit::state::inactive); + + // Now check the callback admission path (admission check on resources being freed). + ug2.reset(); + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + BOOST_REQUIRE_EQUAL(permit3.get_state(), reader_permit::state::inactive); + } +} + +// Check that inactive reads are evicted when they are blocking admission +SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_necessary_evicting) { + const auto initial_resources = reader_concurrency_semaphore::resources{2, 4 * 1024}; + reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100); + auto stop_sem = deferred_stop(semaphore); + + simple_schema ss; + auto s = ss.schema(); + + uint64_t evicted_reads = 0; + + auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + + // No count resources - obtaining new permit + { + auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 2 * 1024); + + auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1)); + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + + auto new_permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + BOOST_REQUIRE(!handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, ++evicted_reads); + } + + BOOST_REQUIRE(permit1.needs_readmission()); + permit1.wait_readmission().get(); + + // No count resources - waiter + { + auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 2 * 1024); + + auto new_permit_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout); + BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1); + + auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1)); + BOOST_REQUIRE(!handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, ++evicted_reads); + + new_permit_fut.get(); + } + + BOOST_REQUIRE(permit1.needs_readmission()); + permit1.wait_readmission().get(); + + // No memory resources + { + auto units = permit1.consume_memory(3 * 1024); + + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 1); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 0); + + auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1)); + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + + auto new_permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + BOOST_REQUIRE(!handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, ++evicted_reads); + } + + BOOST_REQUIRE(permit1.needs_readmission()); + permit1.wait_readmission().get(); + + // No memory resources - waiter + { + auto units = permit1.consume_memory(3 * 1024); + + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 1); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 0); + + auto new_permit_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout); + BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1); + + auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1)); + BOOST_REQUIRE(!handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, ++evicted_reads); + + new_permit_fut.get(); + } + + BOOST_REQUIRE(permit1.needs_readmission()); + permit1.wait_readmission().get(); + + // No count resources - waiter blocked on something else too + { + auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 2 * 1024); + + std::optional ug{permit2}; + + auto new_permit_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout); + BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1); + + auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1)); + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + + ug.reset(); + BOOST_REQUIRE(!handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, ++evicted_reads); + + new_permit_fut.get(); + } + + BOOST_REQUIRE(permit1.needs_readmission()); + permit1.wait_readmission().get(); + + // No memory resources - waiter blocked on something else too + { + semaphore.set_resources({initial_resources.count + 1, initial_resources.memory}); + auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + auto units = permit1.consume_memory(2 * 1024); + + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 1); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 0); + + std::optional ug{permit2}; + + auto new_permit_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout); + BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1); + + auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1)); + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + + ug.reset(); + thread::yield(); // allow debug builds to schedule the fiber evicting the reads again + BOOST_REQUIRE(!handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, ++evicted_reads); + + new_permit_fut.get(); + + semaphore.set_resources(initial_resources); + } +}