reader_concurrency_semaphore: don't evict inactive readers needlessly
Inactive readers should only be evicted to free up resources for waiting
readers. Evicting them when waiters are not admitted for any other
reason than resources is wasteful and leads to extra load later on when
these evicted readers have to be recreated end requeued.
This patch changes the logic on both the registering path and the
admission path to not evict inactive readers unless there are readers
actually waiting on resources.
A unit-test is also added, reproducing the overly-agressive eviction and
checking that it doesn't happen anymore.
Fixes: #11803
Closes #13286
(cherry picked from commit bd57471e54)
This commit is contained in:
@@ -700,11 +700,7 @@ reader_concurrency_semaphore::~reader_concurrency_semaphore() {
|
||||
reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore::register_inactive_read(flat_mutation_reader_v2 reader) noexcept {
|
||||
auto& permit_impl = *reader.permit()._impl;
|
||||
permit_impl.on_register_as_inactive();
|
||||
// Implies _inactive_reads.empty(), we don't queue new readers before
|
||||
// evicting all inactive reads.
|
||||
// Checking the _wait_list covers the count resources only, so check memory
|
||||
// separately.
|
||||
if (_wait_list.empty() && _resources.memory > 0) {
|
||||
if (!should_evict_inactive_read()) {
|
||||
try {
|
||||
auto irp = std::make_unique<inactive_read>(std::move(reader));
|
||||
auto& ir = *irp;
|
||||
@@ -902,7 +898,7 @@ void reader_concurrency_semaphore::evict_readers_in_background() {
|
||||
// This is safe since stop() closes _gate;
|
||||
(void)with_gate(_close_readers_gate, [this] {
|
||||
return repeat([this] {
|
||||
if (_wait_list.empty() || _inactive_reads.empty()) {
|
||||
if (_inactive_reads.empty() || !should_evict_inactive_read()) {
|
||||
_evicting = false;
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
@@ -935,6 +931,17 @@ reader_concurrency_semaphore::can_admit_read(const reader_permit& permit) const
|
||||
return {can_admit::yes, reason::all_ok};
|
||||
}
|
||||
|
||||
bool reader_concurrency_semaphore::should_evict_inactive_read() const noexcept {
|
||||
if (_resources.memory < 0 || _resources.count < 0) {
|
||||
return true;
|
||||
}
|
||||
if (_wait_list.empty()) {
|
||||
return false;
|
||||
}
|
||||
const auto r = can_admit_read(_wait_list.front().permit).why;
|
||||
return r == reason::memory_resources || r == reason::count_resources;
|
||||
}
|
||||
|
||||
future<> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, read_func func) {
|
||||
if (!_execution_loop_future) {
|
||||
_execution_loop_future.emplace(execution_loop());
|
||||
|
||||
@@ -230,6 +230,8 @@ private:
|
||||
struct admit_result { can_admit decision; reason why; };
|
||||
admit_result can_admit_read(const reader_permit& permit) const noexcept;
|
||||
|
||||
bool should_evict_inactive_read() const noexcept;
|
||||
|
||||
void maybe_admit_waiters() noexcept;
|
||||
|
||||
void on_permit_created(reader_permit::impl&);
|
||||
|
||||
@@ -1115,3 +1115,247 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_with_inactive_re
|
||||
permit = {};
|
||||
stop_f.get();
|
||||
}
|
||||
|
||||
// Check that inactive reads are not needlessly evicted when admission is not
|
||||
// blocked on resources.
|
||||
// This test covers all the cases where eviction should **not** happen.
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_no_unnecessary_evicting) {
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{2, 4 * 1024};
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
|
||||
auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get();
|
||||
|
||||
// There are available resources
|
||||
{
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 3 * 1024);
|
||||
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1));
|
||||
BOOST_REQUIRE(handle);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1);
|
||||
|
||||
semaphore.set_resources(initial_resources);
|
||||
BOOST_REQUIRE(handle);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1);
|
||||
|
||||
BOOST_REQUIRE(semaphore.unregister_inactive_read(std::move(handle)));
|
||||
}
|
||||
|
||||
// Count resources are on the limit but no one wants more
|
||||
{
|
||||
auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 2 * 1024);
|
||||
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1));
|
||||
BOOST_REQUIRE(handle);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1);
|
||||
|
||||
semaphore.set_resources(initial_resources);
|
||||
BOOST_REQUIRE(handle);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1);
|
||||
|
||||
BOOST_REQUIRE(semaphore.unregister_inactive_read(std::move(handle)));
|
||||
}
|
||||
|
||||
// Memory resources are on the limit but no one wants more
|
||||
{
|
||||
auto units = permit1.consume_memory(3 * 1024);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 0);
|
||||
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1));
|
||||
BOOST_REQUIRE(handle);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1);
|
||||
BOOST_REQUIRE(semaphore.unregister_inactive_read(std::move(handle)));
|
||||
}
|
||||
|
||||
// Up the resource count, we need more permits to check the rest of the scenarios
|
||||
semaphore.set_resources({4, 4 * 1024});
|
||||
|
||||
// There are waiters but they are not blocked on resources
|
||||
{
|
||||
auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get();
|
||||
auto permit3 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get();
|
||||
|
||||
std::optional<reader_permit::used_guard> ug1{permit1};
|
||||
std::optional<reader_permit::used_guard> ug2{permit2};
|
||||
|
||||
auto permit4_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_queued_because_used_permits, 1);
|
||||
|
||||
// First check the register path.
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit3));
|
||||
|
||||
BOOST_REQUIRE(handle);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, 0);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1);
|
||||
BOOST_REQUIRE_EQUAL(permit3.get_state(), reader_permit::state::inactive);
|
||||
|
||||
// Now check the callback admission path (admission check on resources being freed).
|
||||
ug2.reset();
|
||||
BOOST_REQUIRE(handle);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, 0);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1);
|
||||
BOOST_REQUIRE_EQUAL(permit3.get_state(), reader_permit::state::inactive);
|
||||
}
|
||||
}
|
||||
|
||||
// Check that inactive reads are evicted when they are blocking admission
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_necessary_evicting) {
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{2, 4 * 1024};
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
|
||||
uint64_t evicted_reads = 0;
|
||||
|
||||
auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get();
|
||||
|
||||
// No count resources - obtaining new permit
|
||||
{
|
||||
auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 2 * 1024);
|
||||
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1));
|
||||
BOOST_REQUIRE(handle);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1);
|
||||
|
||||
auto new_permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get();
|
||||
BOOST_REQUIRE(!handle);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, ++evicted_reads);
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(permit1.needs_readmission());
|
||||
permit1.wait_readmission().get();
|
||||
|
||||
// No count resources - waiter
|
||||
{
|
||||
auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 2 * 1024);
|
||||
|
||||
auto new_permit_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1);
|
||||
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1));
|
||||
BOOST_REQUIRE(!handle);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, ++evicted_reads);
|
||||
|
||||
new_permit_fut.get();
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(permit1.needs_readmission());
|
||||
permit1.wait_readmission().get();
|
||||
|
||||
// No memory resources
|
||||
{
|
||||
auto units = permit1.consume_memory(3 * 1024);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 0);
|
||||
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1));
|
||||
BOOST_REQUIRE(handle);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1);
|
||||
|
||||
auto new_permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get();
|
||||
BOOST_REQUIRE(!handle);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, ++evicted_reads);
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(permit1.needs_readmission());
|
||||
permit1.wait_readmission().get();
|
||||
|
||||
// No memory resources - waiter
|
||||
{
|
||||
auto units = permit1.consume_memory(3 * 1024);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 0);
|
||||
|
||||
auto new_permit_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1);
|
||||
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1));
|
||||
BOOST_REQUIRE(!handle);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, ++evicted_reads);
|
||||
|
||||
new_permit_fut.get();
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(permit1.needs_readmission());
|
||||
permit1.wait_readmission().get();
|
||||
|
||||
// No count resources - waiter blocked on something else too
|
||||
{
|
||||
auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 2 * 1024);
|
||||
|
||||
std::optional<reader_permit::used_guard> ug{permit2};
|
||||
|
||||
auto new_permit_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1);
|
||||
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1));
|
||||
BOOST_REQUIRE(handle);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1);
|
||||
|
||||
ug.reset();
|
||||
BOOST_REQUIRE(!handle);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, ++evicted_reads);
|
||||
|
||||
new_permit_fut.get();
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(permit1.needs_readmission());
|
||||
permit1.wait_readmission().get();
|
||||
|
||||
// No memory resources - waiter blocked on something else too
|
||||
{
|
||||
semaphore.set_resources({initial_resources.count + 1, initial_resources.memory});
|
||||
auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get();
|
||||
auto units = permit1.consume_memory(2 * 1024);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources().memory, 0);
|
||||
|
||||
std::optional<reader_permit::used_guard> ug{permit2};
|
||||
|
||||
auto new_permit_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1);
|
||||
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit1));
|
||||
BOOST_REQUIRE(handle);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1);
|
||||
|
||||
ug.reset();
|
||||
thread::yield(); // allow debug builds to schedule the fiber evicting the reads again
|
||||
BOOST_REQUIRE(!handle);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, ++evicted_reads);
|
||||
|
||||
new_permit_fut.get();
|
||||
|
||||
semaphore.set_resources(initial_resources);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user