diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 3f57cb08bc..0b637eb410 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -491,6 +491,10 @@ public: _trace_ptr = std::move(trace_ptr); } + bool aborted() const { + return bool(_ex); + } + void check_abort() const { if (_ex) { std::rethrow_exception(_ex); @@ -1124,6 +1128,11 @@ reader_concurrency_semaphore::~reader_concurrency_semaphore() { reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore::register_inactive_read(mutation_reader reader, const dht::partition_range* range) noexcept { auto& permit = reader.permit(); + if (permit->aborted()) { + permit->release_base_resources(); + close_reader(std::move(reader)); + return inactive_read_handle(); + } if (permit->get_state() == reader_permit::state::waiting_for_memory) { // Kill all outstanding memory requests, the read is going to be evicted. permit->aux_data().pr.set_exception(std::make_exception_ptr(std::bad_alloc{})); diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index 43c913712e..aec44e4362 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -735,7 +735,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // resources and waitlist { - reader_permit_opt permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); + reader_permit_opt permit = semaphore.obtain_permit(schema, get_name(), 1024, db::no_timeout, {}).get(); require_can_admit(true, "enough resources"); @@ -763,7 +763,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // need_cpu and awaits { - auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); + auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::no_timeout, {}).get(); require_can_admit(true, "!need_cpu"); { @@ -792,7 +792,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // forward progress -- readmission { - auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); + auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::no_timeout, {}).get(); auto irh = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit)); BOOST_REQUIRE(semaphore.try_evict_one_inactive_read()); @@ -886,7 +886,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { check_admitting_enqueued_read( [&] { - return reader_permit_opt(semaphore.obtain_permit(schema, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get()); + return reader_permit_opt(semaphore.obtain_permit(schema, get_name(), 2 * 1024, db::no_timeout, {}).get()); }, [] (reader_permit_opt& permit1) { permit1 = {}; @@ -901,7 +901,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { check_admitting_enqueued_read( [&] { - return reader_permit_opt(semaphore.obtain_permit(schema, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get()); + return reader_permit_opt(semaphore.obtain_permit(schema, get_name(), 2 * 1024, db::no_timeout, {}).get()); }, [&] (reader_permit_opt& permit1) { return semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), *permit1)); @@ -915,7 +915,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { check_admitting_enqueued_read( [&] { - auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); + auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::no_timeout, {}).get(); require_can_admit(true, "enough resources"); return std::pair(permit, std::optional{permit}); }, [&] (std::pair>& permit_and_need_cpu_guard) { @@ -931,7 +931,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { check_admitting_enqueued_read( [&] { - auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); + auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::no_timeout, {}).get(); require_can_admit(true, "enough resources"); return std::pair(permit, reader_permit::need_cpu_guard{permit}); }, [&] (std::pair& permit_and_need_cpu_guard) { @@ -2230,7 +2230,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_cpu_concu ::require_can_admit(schema, semaphore, expected_can_admit, description, sl); }; - auto permit1 = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); + auto permit1 = semaphore.obtain_permit(schema, get_name(), 1024, db::no_timeout, {}).get(); require_can_admit(true, "!need_cpu"); { @@ -2238,7 +2238,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_cpu_concu require_can_admit(true, "need_cpu < cpu_concurrency"); - auto permit2 = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); + auto permit2 = semaphore.obtain_permit(schema, get_name(), 1024, db::no_timeout, {}).get(); // no change require_can_admit(true, "need_cpu < cpu_concurrency"); @@ -2312,4 +2312,65 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_wait_queue_overload_c permit2_fut.get(); } +// Check that attempting to abort an already aborted permit is handled correctly. +SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_double_permit_abort) { + simple_schema s; + const auto schema = s.schema(); + + const std::string test_name = get_name(); + + reader_concurrency_semaphore semaphore( + utils::updateable_value(2), + 2048, + test_name + " semaphore", + std::numeric_limits::max(), + utils::updateable_value(2), + utils::updateable_value(400), + utils::updateable_value(2), + reader_concurrency_semaphore::register_metrics::no); + auto stop_sem = deferred_stop(semaphore); + + reader_permit_opt permit1 = semaphore.obtain_permit(schema, test_name, 1024, db::no_timeout, {}).get(); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_admitted, 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_admitted_immediately, 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 0); + + reader_permit_opt permit2 = semaphore.obtain_permit(schema, test_name, 1024, db::no_timeout, {}).get(); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_admitted, 2); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_admitted_immediately, 2); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 0); + + // Exhaust all memory until serialize-limit triggers and make sure permit1 is + // the blessed one. + auto res1 = permit1->consume_memory(2048); + auto requested_memory1_fut = permit1->request_memory(1024); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_memory, 0); + auto requested_memory1 = requested_memory1_fut.get(); + + // Requesting memory for permit2 will queue the permit for memory. + auto requested_memory2_fut = permit2->request_memory(1024); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_memory, 1); + + // Set timeout to 0 and wait for permit to time out. + permit2->set_timeout(db::timeout_clock::now()); + eventually_true([&] { + try { + permit2->check_abort(); + return false; + } catch (named_semaphore_timed_out&) { + return true; + } catch (...) { + BOOST_FAIL(format("unexpected exception while waiting for permit to time out: {}", std::current_exception())); + return true; + } + }); + + // Attempting to register a read which is queued on memory as inactive, will + // trigger an attempt to abort the read. + // This is where the double-abort happens. + auto irh = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), *permit2)); + + BOOST_REQUIRE_THROW(requested_memory2_fut.get(), named_semaphore_timed_out); +} + BOOST_AUTO_TEST_SUITE_END()