Merge 'reader_concurrency_semaphore: register_inactive_read(): handle aborted permit' from Botond Dénes

It is possible that the permit handed in to register_inactive_read() is already aborted (currently only possible if permit timed out). If the permit also happens to have wait for memory, the current code will attempt to call promise<>::set_exception() on the permit's promise to abort its waiters. But if the permit was already aborted via timeout, this promise will already have an exception and this will trigger an assert. Add a separate case for checking if the permit is aborted already. If so, treat it as immediate eviction: close the reader and clean up.

Fixes: scylladb/scylladb#22919

Bug is present in all live versions, backports are required.

Closes scylladb/scylladb#23044

* github.com:scylladb/scylladb:
  reader_concurrency_semaphore: register_inactive_read(): handle aborted permit
  test/boost/reader_concurrency_semaphore_test: move away from db::timeout_clock::now()
This commit is contained in:
Pavel Emelyanov
2025-03-04 10:40:28 +03:00
2 changed files with 79 additions and 9 deletions

View File

@@ -735,7 +735,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
// resources and waitlist
{
reader_permit_opt permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get();
reader_permit_opt permit = semaphore.obtain_permit(schema, get_name(), 1024, db::no_timeout, {}).get();
require_can_admit(true, "enough resources");
@@ -763,7 +763,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
// need_cpu and awaits
{
auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get();
auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::no_timeout, {}).get();
require_can_admit(true, "!need_cpu");
{
@@ -792,7 +792,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
// forward progress -- readmission
{
auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get();
auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::no_timeout, {}).get();
auto irh = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit));
BOOST_REQUIRE(semaphore.try_evict_one_inactive_read());
@@ -886,7 +886,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
{
check_admitting_enqueued_read(
[&] {
return reader_permit_opt(semaphore.obtain_permit(schema, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get());
return reader_permit_opt(semaphore.obtain_permit(schema, get_name(), 2 * 1024, db::no_timeout, {}).get());
},
[] (reader_permit_opt& permit1) {
permit1 = {};
@@ -901,7 +901,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
{
check_admitting_enqueued_read(
[&] {
return reader_permit_opt(semaphore.obtain_permit(schema, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get());
return reader_permit_opt(semaphore.obtain_permit(schema, get_name(), 2 * 1024, db::no_timeout, {}).get());
},
[&] (reader_permit_opt& permit1) {
return semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), *permit1));
@@ -915,7 +915,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
{
check_admitting_enqueued_read(
[&] {
auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get();
auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::no_timeout, {}).get();
require_can_admit(true, "enough resources");
return std::pair(permit, std::optional<reader_permit::need_cpu_guard>{permit});
}, [&] (std::pair<reader_permit, std::optional<reader_permit::need_cpu_guard>>& permit_and_need_cpu_guard) {
@@ -931,7 +931,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
{
check_admitting_enqueued_read(
[&] {
auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get();
auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::no_timeout, {}).get();
require_can_admit(true, "enough resources");
return std::pair(permit, reader_permit::need_cpu_guard{permit});
}, [&] (std::pair<reader_permit, reader_permit::need_cpu_guard>& permit_and_need_cpu_guard) {
@@ -2230,7 +2230,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_cpu_concu
::require_can_admit(schema, semaphore, expected_can_admit, description, sl);
};
auto permit1 = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get();
auto permit1 = semaphore.obtain_permit(schema, get_name(), 1024, db::no_timeout, {}).get();
require_can_admit(true, "!need_cpu");
{
@@ -2238,7 +2238,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_cpu_concu
require_can_admit(true, "need_cpu < cpu_concurrency");
auto permit2 = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get();
auto permit2 = semaphore.obtain_permit(schema, get_name(), 1024, db::no_timeout, {}).get();
// no change
require_can_admit(true, "need_cpu < cpu_concurrency");
@@ -2312,4 +2312,65 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_wait_queue_overload_c
permit2_fut.get();
}
// Check that attempting to abort an already aborted permit is handled correctly.
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_double_permit_abort) {
simple_schema s;
const auto schema = s.schema();
const std::string test_name = get_name();
reader_concurrency_semaphore semaphore(
utils::updateable_value<int>(2),
2048,
test_name + " semaphore",
std::numeric_limits<size_t>::max(),
utils::updateable_value<uint32_t>(2),
utils::updateable_value<uint32_t>(400),
utils::updateable_value<uint32_t>(2),
reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
reader_permit_opt permit1 = semaphore.obtain_permit(schema, test_name, 1024, db::no_timeout, {}).get();
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_admitted, 1);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_admitted_immediately, 1);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 0);
reader_permit_opt permit2 = semaphore.obtain_permit(schema, test_name, 1024, db::no_timeout, {}).get();
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_admitted, 2);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_admitted_immediately, 2);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 0);
// Exhaust all memory until serialize-limit triggers and make sure permit1 is
// the blessed one.
auto res1 = permit1->consume_memory(2048);
auto requested_memory1_fut = permit1->request_memory(1024);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_memory, 0);
auto requested_memory1 = requested_memory1_fut.get();
// Requesting memory for permit2 will queue the permit for memory.
auto requested_memory2_fut = permit2->request_memory(1024);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_memory, 1);
// Set timeout to 0 and wait for permit to time out.
permit2->set_timeout(db::timeout_clock::now());
eventually_true([&] {
try {
permit2->check_abort();
return false;
} catch (named_semaphore_timed_out&) {
return true;
} catch (...) {
BOOST_FAIL(format("unexpected exception while waiting for permit to time out: {}", std::current_exception()));
return true;
}
});
// Attempting to register a read which is queued on memory as inactive, will
// trigger an attempt to abort the read.
// This is where the double-abort happens.
auto irh = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), *permit2));
BOOST_REQUIRE_THROW(requested_memory2_fut.get(), named_semaphore_timed_out);
}
BOOST_AUTO_TEST_SUITE_END()