From 3e10c3fc891852e59514bb5ecbbece460db94702 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 22 Mar 2023 06:27:50 -0400 Subject: [PATCH] reader_concurrency_semaphore: don't evict inactive readers needlessly Inactive readers should only be evicted to free up resources for waiting readers. Evicting them when waiters are not admitted for any other reason than resources is wasteful and leads to extra load later on when these evicted readers have to be recreated end requeued. This patch changes the logic on both the registering path and the admission path to not evict inactive readers unless there are readers actually waiting on resources. A unit-test is also added, reproducing the overly-agressive eviction and checking that it doesn't happen anymore. Fixes: #11803 Closes #13286 (cherry picked from commit bd57471e54af745d11245172e8802315478cbd28) --- reader_concurrency_semaphore.cc | 19 +- reader_concurrency_semaphore.hh | 2 + .../reader_concurrency_semaphore_test.cc | 244 ++++++++++++++++++ 3 files changed, 259 insertions(+), 6 deletions(-) diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 83fec82d89..10907647a5 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -700,11 +700,7 @@ reader_concurrency_semaphore::~reader_concurrency_semaphore() { reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore::register_inactive_read(flat_mutation_reader_v2 reader) noexcept { auto& permit_impl = *reader.permit()._impl; permit_impl.on_register_as_inactive(); - // Implies _inactive_reads.empty(), we don't queue new readers before - // evicting all inactive reads. - // Checking the _wait_list covers the count resources only, so check memory - // separately. - if (_wait_list.empty() && _resources.memory > 0) { + if (!should_evict_inactive_read()) { try { auto irp = std::make_unique(std::move(reader)); auto& ir = *irp; @@ -902,7 +898,7 @@ void reader_concurrency_semaphore::evict_readers_in_background() { // This is safe since stop() closes _gate; (void)with_gate(_close_readers_gate, [this] { return repeat([this] { - if (_wait_list.empty() || _inactive_reads.empty()) { + if (_inactive_reads.empty() || !should_evict_inactive_read()) { _evicting = false; return make_ready_future(stop_iteration::yes); } @@ -935,6 +931,17 @@ reader_concurrency_semaphore::can_admit_read(const reader_permit& permit) const return {can_admit::yes, reason::all_ok}; } +bool reader_concurrency_semaphore::should_evict_inactive_read() const noexcept { + if (_resources.memory < 0 || _resources.count < 0) { + return true; + } + if (_wait_list.empty()) { + return false; + } + const auto r = can_admit_read(_wait_list.front().permit).why; + return r == reason::memory_resources || r == reason::count_resources; +} + future<> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, read_func func) { if (!_execution_loop_future) { _execution_loop_future.emplace(execution_loop()); diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 30c5c6b43e..c1c30ad540 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -230,6 +230,8 @@ private: struct admit_result { can_admit decision; reason why; }; admit_result can_admit_read(const reader_permit& permit) const noexcept; + bool should_evict_inactive_read() const noexcept; + void maybe_admit_waiters() noexcept; void on_permit_created(reader_permit::impl&); diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index dfacbdb4c2..6eec0cad90 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -1115,3 +1115,247 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_with_inactive_re permit = {}; stop_f.get(); } + +// Check that inactive reads are not needlessly evicted when admission is not +// blocked on resources. +// This test covers all the cases where eviction should **not** happen. +SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_no_unnecessary_evicting) { + const auto initial_resources = reader_concurrency_semaphore::resources{2, 4 * 1024}; + reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100); + auto stop_sem = deferred_stop(semaphore); + + simple_schema ss; + auto s = ss.schema(); + + auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + + // There are available resources + { + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 1); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 3 * 1024); + + auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1)); + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + + semaphore.set_resources(initial_resources); + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + + BOOST_REQUIRE(semaphore.unregister_inactive_read(std::move(handle))); + } + + // Count resources are on the limit but no one wants more + { + auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 2 * 1024); + + auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1)); + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + + semaphore.set_resources(initial_resources); + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + + BOOST_REQUIRE(semaphore.unregister_inactive_read(std::move(handle))); + } + + // Memory resources are on the limit but no one wants more + { + auto units = permit1.consume_memory(3 * 1024); + + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 1); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 0); + + auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1)); + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + BOOST_REQUIRE(semaphore.unregister_inactive_read(std::move(handle))); + } + + // Up the resource count, we need more permits to check the rest of the scenarios + semaphore.set_resources({4, 4 * 1024}); + + // There are waiters but they are not blocked on resources + { + auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + auto permit3 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + + std::optional ug1{permit1}; + std::optional ug2{permit2}; + + auto permit4_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout); + BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_queued_because_used_permits, 1); + + // First check the register path. + auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit3)); + + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + BOOST_REQUIRE_EQUAL(permit3.get_state(), reader_permit::state::inactive); + + // Now check the callback admission path (admission check on resources being freed). + ug2.reset(); + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + BOOST_REQUIRE_EQUAL(permit3.get_state(), reader_permit::state::inactive); + } +} + +// Check that inactive reads are evicted when they are blocking admission +SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_necessary_evicting) { + const auto initial_resources = reader_concurrency_semaphore::resources{2, 4 * 1024}; + reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100); + auto stop_sem = deferred_stop(semaphore); + + simple_schema ss; + auto s = ss.schema(); + + uint64_t evicted_reads = 0; + + auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + + // No count resources - obtaining new permit + { + auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 2 * 1024); + + auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1)); + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + + auto new_permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + BOOST_REQUIRE(!handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, ++evicted_reads); + } + + BOOST_REQUIRE(permit1.needs_readmission()); + permit1.wait_readmission().get(); + + // No count resources - waiter + { + auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 2 * 1024); + + auto new_permit_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout); + BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1); + + auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1)); + BOOST_REQUIRE(!handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, ++evicted_reads); + + new_permit_fut.get(); + } + + BOOST_REQUIRE(permit1.needs_readmission()); + permit1.wait_readmission().get(); + + // No memory resources + { + auto units = permit1.consume_memory(3 * 1024); + + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 1); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 0); + + auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1)); + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + + auto new_permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + BOOST_REQUIRE(!handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, ++evicted_reads); + } + + BOOST_REQUIRE(permit1.needs_readmission()); + permit1.wait_readmission().get(); + + // No memory resources - waiter + { + auto units = permit1.consume_memory(3 * 1024); + + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 1); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 0); + + auto new_permit_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout); + BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1); + + auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1)); + BOOST_REQUIRE(!handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, ++evicted_reads); + + new_permit_fut.get(); + } + + BOOST_REQUIRE(permit1.needs_readmission()); + permit1.wait_readmission().get(); + + // No count resources - waiter blocked on something else too + { + auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 2 * 1024); + + std::optional ug{permit2}; + + auto new_permit_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout); + BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1); + + auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1)); + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + + ug.reset(); + BOOST_REQUIRE(!handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, ++evicted_reads); + + new_permit_fut.get(); + } + + BOOST_REQUIRE(permit1.needs_readmission()); + permit1.wait_readmission().get(); + + // No memory resources - waiter blocked on something else too + { + semaphore.set_resources({initial_resources.count + 1, initial_resources.memory}); + auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get(); + auto units = permit1.consume_memory(2 * 1024); + + BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 1); + BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 0); + + std::optional ug{permit2}; + + auto new_permit_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout); + BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1); + + auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1)); + BOOST_REQUIRE(handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); + + ug.reset(); + thread::yield(); // allow debug builds to schedule the fiber evicting the reads again + BOOST_REQUIRE(!handle); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, ++evicted_reads); + + new_permit_fut.get(); + + semaphore.set_resources(initial_resources); + } +}