From 8829098e90146ab1ca0754bbf2b7957de1ef434a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Paszkowski?= Date: Mon, 16 Dec 2024 11:03:02 +0100 Subject: [PATCH 1/7] reader_concurrency_semaphore: Remove cpu_concurrency's default value The commit 59faa6d, introduces a new parameter called cpu_concurrency and sets its default value to 1 which violates the commit fbb83dd that removes all default values from constructors but one used by the unit tests. The patch removes the default value of the cpu_concurrency parameter and alters tests to use the test dedicated reader_concurrency_semaphore constructor wherever possible. --- reader_concurrency_semaphore.hh | 3 +- replica/database.cc | 1 + sstables/sstables_manager.cc | 1 + .../reader_concurrency_semaphore_test.cc | 37 +++++++------------ test/boost/row_cache_test.cc | 4 +- 5 files changed, 19 insertions(+), 27 deletions(-) diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 6e0a110dcb..3e09d06449 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -296,9 +296,10 @@ public: size_t max_queue_length, utils::updateable_value serialize_limit_multiplier, utils::updateable_value kill_limit_multiplier, + utils::updateable_value cpu_concurrency, register_metrics metrics) : reader_concurrency_semaphore(utils::updateable_value(count), memory, std::move(name), max_queue_length, - std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier), utils::updateable_value(1), metrics) + std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier), std::move(cpu_concurrency), metrics) { } /// Create a semaphore with practically unlimited count and memory. diff --git a/replica/database.cc b/replica/database.cc index f1534326cc..f123c4f7fe 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -423,6 +423,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat std::numeric_limits::max(), utils::updateable_value(std::numeric_limits::max()), utils::updateable_value(std::numeric_limits::max()), + utils::updateable_value(uint32_t(1)), reader_concurrency_semaphore::register_metrics::yes) , _view_update_read_concurrency_semaphores_group( max_memory_concurrent_view_update_reads(), diff --git a/sstables/sstables_manager.cc b/sstables/sstables_manager.cc index 6e0396878a..a4768dc54f 100644 --- a/sstables/sstables_manager.cc +++ b/sstables/sstables_manager.cc @@ -45,6 +45,7 @@ sstables_manager::sstables_manager( std::numeric_limits::max(), utils::updateable_value(std::numeric_limits::max()), utils::updateable_value(std::numeric_limits::max()), + utils::updateable_value(uint32_t(1)), reader_concurrency_semaphore::register_metrics::no) , _dir_semaphore(dir_sem) , _resolve_host_id(std::move(resolve_host_id)) diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index c0c696a3cb..6cc543c635 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -1472,8 +1472,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_memory_limit_no_leaks const auto initial_resources = reader_concurrency_semaphore::resources{4, 4 * 1024}; const auto serialize_multiplier = 2; const auto kill_multiplier = 3; - reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100, - utils::updateable_value(serialize_multiplier), utils::updateable_value(kill_multiplier), reader_concurrency_semaphore::register_metrics::no); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory, 100, + utils::updateable_value(serialize_multiplier), utils::updateable_value(kill_multiplier)); auto stop_sem = deferred_stop(semaphore); const size_t reader_count_target = 6; @@ -1726,9 +1726,8 @@ SEASTAR_TEST_CASE(test_reader_concurrency_semaphore_memory_limit_engages) { SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_request_memory_preserves_state) { const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024}; const auto serialize_multiplier = 2; - const auto kill_multiplier = std::numeric_limits::max(); // we don't want this to interfere with our test - reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100, - utils::updateable_value(serialize_multiplier), utils::updateable_value(kill_multiplier), reader_concurrency_semaphore::register_metrics::no); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, + initial_resources.memory, 100, utils::updateable_value(serialize_multiplier)); auto stop_sem = deferred_stop(semaphore); auto sponge_permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get(); @@ -1789,9 +1788,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_request_memory_preser SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_blessed_read_goes_inactive) { const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024}; const auto serialize_multiplier = 2; - const auto kill_multiplier = std::numeric_limits::max(); // we don't want this to interfere with our test - reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100, - utils::updateable_value(serialize_multiplier), utils::updateable_value(kill_multiplier), reader_concurrency_semaphore::register_metrics::no); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, + initial_resources.memory, 100, utils::updateable_value(serialize_multiplier)); auto stop_sem = deferred_stop(semaphore); simple_schema ss; @@ -1851,9 +1849,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_with_inactive_re SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_permit_waiting_for_memory_goes_inactive) { const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024}; const auto serialize_multiplier = 2; - const auto kill_multiplier = std::numeric_limits::max(); // we don't want this to interfere with our test - reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100, - utils::updateable_value(serialize_multiplier), utils::updateable_value(kill_multiplier), reader_concurrency_semaphore::register_metrics::no); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, + initial_resources.memory, 100, utils::updateable_value(serialize_multiplier)); auto stop_sem = deferred_stop(semaphore); auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get(); @@ -1897,10 +1894,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_permit_waiting_for_me // This test covers all the cases where eviction should **not** happen. SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_no_unnecessary_evicting) { const auto initial_resources = reader_concurrency_semaphore::resources{2, 4 * 1024}; - const auto serialize_multiplier = std::numeric_limits::max(); - const auto kill_multiplier = std::numeric_limits::max(); - reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100, - utils::updateable_value(serialize_multiplier), utils::updateable_value(kill_multiplier), reader_concurrency_semaphore::register_metrics::no); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory, 100); auto stop_sem = deferred_stop(semaphore); simple_schema ss; @@ -1990,10 +1984,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_no_unnecessary_evicti // Check that inactive reads are evicted when they are blocking admission SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_necessary_evicting) { const auto initial_resources = reader_concurrency_semaphore::resources{2, 4 * 1024}; - const auto serialize_multiplier = std::numeric_limits::max(); - const auto kill_multiplier = std::numeric_limits::max(); - reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100, - utils::updateable_value(serialize_multiplier), utils::updateable_value(kill_multiplier), reader_concurrency_semaphore::register_metrics::no); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory, 100); auto stop_sem = deferred_stop(semaphore); simple_schema ss; @@ -2147,10 +2138,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_necessary_evicting) { // resources. SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_execution_stage_wakeup) { const auto initial_resources = reader_concurrency_semaphore::resources{2, 4 * 1024}; - const auto serialize_multiplier = std::numeric_limits::max(); - const auto kill_multiplier = std::numeric_limits::max(); - reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100, - utils::updateable_value(serialize_multiplier), utils::updateable_value(kill_multiplier), reader_concurrency_semaphore::register_metrics::no); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory, 100); auto stop_sem = deferred_stop(semaphore); auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get(); @@ -2186,6 +2174,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_count) { const uint32_t initial_memory = 4 * 1024; const auto serialize_multiplier = std::numeric_limits::max(); const auto kill_multiplier = std::numeric_limits::max(); + const auto cpu_concurrency = 1; reader_concurrency_semaphore semaphore( utils::updateable_value(count), @@ -2194,7 +2183,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_count) { 100, utils::updateable_value(serialize_multiplier), utils::updateable_value(kill_multiplier), - utils::updateable_value(1), + utils::updateable_value(cpu_concurrency), reader_concurrency_semaphore::register_metrics::no); auto stop_sem = deferred_stop(semaphore); diff --git a/test/boost/row_cache_test.cc b/test/boost/row_cache_test.cc index 22b99a395f..a17ca97110 100644 --- a/test/boost/row_cache_test.cc +++ b/test/boost/row_cache_test.cc @@ -4837,8 +4837,8 @@ SEASTAR_TEST_CASE(test_compact_range_tombstones_on_read) { // of course doesn't necessarily help release pressure on the semaphore. SEASTAR_THREAD_TEST_CASE(test_cache_reader_semaphore_oom_kill) { simple_schema s; - reader_concurrency_semaphore semaphore(100, 1, get_name(), std::numeric_limits::max(), utils::updateable_value(1), - utils::updateable_value(1), reader_concurrency_semaphore::register_metrics::no); + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 100, 1, std::numeric_limits::max(), + utils::updateable_value(1), utils::updateable_value(1)); auto stop_semaphore = deferred_stop(semaphore); cache_tracker tracker; From 5a7cea00d084fbcdf46d32f33006876ba33c304d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Paszkowski?= Date: Fri, 23 Jan 2026 16:33:26 +0100 Subject: [PATCH 2/7] reader_concurrency_semaphore: validate waiters counter when dequeueing a waiting permit Add a defensive check in dequeue_permit() to avoid underflowing _stats.waiters and report an internal error if the stats are already inconsistent. --- reader_concurrency_semaphore.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 5000143ad1..76dec540a0 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -1549,7 +1549,11 @@ void reader_concurrency_semaphore::dequeue_permit(reader_permit::impl& permit) { case reader_permit::state::waiting_for_admission: case reader_permit::state::waiting_for_memory: case reader_permit::state::waiting_for_execution: - --_stats.waiters; + if (_stats.waiters > 0) { + --_stats.waiters; + } else { + on_internal_error_noexcept(rcslog, "reader_concurrency_semaphore::dequeue_permit(): invalid state: no waiters yet dequeueing a waiting permit"); + } break; case reader_permit::state::inactive: case reader_permit::state::evicted: From 2d3a40e023512d5e7eb762713d2bcb2a562ad0d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Paszkowski?= Date: Mon, 26 Jan 2026 09:16:40 +0100 Subject: [PATCH 3/7] permit_reader: Add a new state: preemptive_aborted A permit gets into the preemptive_aborted state when: - times out; - gets rejected from execution due to high chance its execution would not finalize on time; Being in this state means a permit was removed from the wait list, its internal timer was canceled and semaphore's statistic `total_reads_shed_due_to_overload` increased. --- docs/dev/reader-concurrency-semaphore.md | 1 + reader_concurrency_semaphore.cc | 24 ++++++++++++++++++++++++ reader_concurrency_semaphore.hh | 5 ++++- reader_permit.hh | 1 + 4 files changed, 30 insertions(+), 1 deletion(-) diff --git a/docs/dev/reader-concurrency-semaphore.md b/docs/dev/reader-concurrency-semaphore.md index 740d9dc5a0..3fc30256bb 100644 --- a/docs/dev/reader-concurrency-semaphore.md +++ b/docs/dev/reader-concurrency-semaphore.md @@ -78,6 +78,7 @@ Permits are in one of the following states: * `active/await` - a previously `active/need_cpu` permit, which needs something other than CPU to proceed, it is waiting on I/O or a remote shards, other permits can be admitted while the permit is in this state, pending resource availability; * `inactive` - the permit was marked inactive, it can be evicted to make room for admitting more permits if needed; * `evicted` - a former inactive permit which was evicted, the permit has to undergo admission again for the read to resume; +* `preemptive_aborted` - the permit timed out or was rejected during admission as it was detected the read might time out later during execution; Note that some older releases will have different names for some of these states or lack some of the states altogether: diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 76dec540a0..66c451b9b1 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -239,8 +239,13 @@ private: _semaphore.evict(*this, reader_concurrency_semaphore::evict_reason::time); break; case state::evicted: + case state::preemptive_aborted: break; } + + // The function call not only sets state to reader_permit::state::preemptive_aborted + // but also correctly decreases the statistics i.e. need_cpu_permits and awaits_permits. + on_permit_inactive(reader_permit::state::preemptive_aborted); } public: @@ -360,6 +365,17 @@ public: on_permit_active(); } + void on_preemptive_aborted() { + if (_state != reader_permit::state::waiting_for_admission && _state != reader_permit::state::waiting_for_memory) { + on_internal_error(rcslog, format("on_preemptive_aborted(): permit in invalid state {}", _state)); + } + + _ttl_timer.cancel(); + _state = reader_permit::state::preemptive_aborted; + _aux_data.pr.set_exception(named_semaphore_aborted(_semaphore._name)); + _semaphore.on_permit_preemptive_aborted(); + } + void on_register_as_inactive() { SCYLLA_ASSERT(_state == reader_permit::state::active || _state == reader_permit::state::active_need_cpu || _state == reader_permit::state::waiting_for_memory); on_permit_inactive(reader_permit::state::inactive); @@ -689,6 +705,9 @@ auto fmt::formatter::format(reader_permit::state s, fmt::f case reader_permit::state::evicted: name = "evicted"; break; + case reader_permit::state::preemptive_aborted: + name = "preemptive_aborted"; + break; } return formatter::format(name, ctx); } @@ -1562,12 +1581,17 @@ void reader_concurrency_semaphore::dequeue_permit(reader_permit::impl& permit) { case reader_permit::state::active: case reader_permit::state::active_need_cpu: case reader_permit::state::active_await: + case reader_permit::state::preemptive_aborted: on_internal_error_noexcept(rcslog, format("reader_concurrency_semaphore::dequeue_permit(): unrecognized queued state: {}", permit.get_state())); } permit.unlink(); _permit_list.push_back(permit); } +void reader_concurrency_semaphore::on_permit_preemptive_aborted() noexcept { + ++_stats.total_reads_shed_due_to_overload; +} + void reader_concurrency_semaphore::on_permit_created(reader_permit::impl& permit) { _permit_gate.enter(); _permit_list.push_back(permit); diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 3e09d06449..6377244302 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -42,7 +42,7 @@ using mutation_reader_opt = optimized_optional; /// number of waiting readers becomes equal or greater than /// `max_queue_length` (upon calling `obtain_permit()`) an exception of /// type `std::runtime_error` is thrown. Optionally, some additional -/// code can be executed just before throwing (`prethrow_action` +/// code can be executed just before throwing (`prethrow_action` /// constructor parameter). /// /// The semaphore has 3 layers of defense against consuming more memory @@ -89,6 +89,7 @@ public: // Total number of failed reads executed through this semaphore. uint64_t total_failed_reads = 0; // Total number of reads rejected because the admission queue reached its max capacity + // or rejected due to a high probability of not getting finalized on time. uint64_t total_reads_shed_due_to_overload = 0; // Total number of reads killed due to the memory consumption reaching the kill limit. uint64_t total_reads_killed_due_to_kill_limit = 0; @@ -250,6 +251,8 @@ private: void on_permit_created(reader_permit::impl&); void on_permit_destroyed(reader_permit::impl&) noexcept; + void on_permit_preemptive_aborted() noexcept; + void on_permit_need_cpu() noexcept; void on_permit_not_need_cpu() noexcept; diff --git a/reader_permit.hh b/reader_permit.hh index c2769c1d37..d72d10b376 100644 --- a/reader_permit.hh +++ b/reader_permit.hh @@ -92,6 +92,7 @@ public: active_await, inactive, evicted, + preemptive_aborted, }; class impl; From 21348050e8444b5942a0425a12587e3ed84417f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Paszkowski?= Date: Mon, 16 Dec 2024 14:27:40 +0100 Subject: [PATCH 4/7] config: Add parameters to control reads' preemptive_abort_factor --- db/config.cc | 4 ++++ db/config.hh | 1 + 2 files changed, 5 insertions(+) diff --git a/db/config.cc b/db/config.cc index c40389e1ce..36a27dd995 100644 --- a/db/config.cc +++ b/db/config.cc @@ -1394,6 +1394,10 @@ db::config::config(std::shared_ptr exts) "Start killing reads after their collective memory consumption goes above $normal_limit * $multiplier.") , reader_concurrency_semaphore_cpu_concurrency(this, "reader_concurrency_semaphore_cpu_concurrency", liveness::LiveUpdate, value_status::Used, 2, "Admit new reads while there are less than this number of requests that need CPU.") + , reader_concurrency_semaphore_preemptive_abort_factor(this, "reader_concurrency_semaphore_preemptive_abort_factor", liveness::LiveUpdate, value_status::Used, 0.3, + "Admit new reads while their remaining time is more than this factor times their timeout times when arrived to a semaphore. Its vale means\n" + "* <= 0.0 means new reads will never get rejected during admission\n" + "* >= 1.0 means new reads will always get rejected during admission\n") , view_update_reader_concurrency_semaphore_serialize_limit_multiplier(this, "view_update_reader_concurrency_semaphore_serialize_limit_multiplier", liveness::LiveUpdate, value_status::Used, 2, "Start serializing view update reads after their collective memory consumption goes above $normal_limit * $multiplier.") , view_update_reader_concurrency_semaphore_kill_limit_multiplier(this, "view_update_reader_concurrency_semaphore_kill_limit_multiplier", liveness::LiveUpdate, value_status::Used, 4, diff --git a/db/config.hh b/db/config.hh index 73321644a0..5a992fe246 100644 --- a/db/config.hh +++ b/db/config.hh @@ -446,6 +446,7 @@ public: named_value reader_concurrency_semaphore_serialize_limit_multiplier; named_value reader_concurrency_semaphore_kill_limit_multiplier; named_value reader_concurrency_semaphore_cpu_concurrency; + named_value reader_concurrency_semaphore_preemptive_abort_factor; named_value view_update_reader_concurrency_semaphore_serialize_limit_multiplier; named_value view_update_reader_concurrency_semaphore_kill_limit_multiplier; named_value view_update_reader_concurrency_semaphore_cpu_concurrency; From fde09fd136a6b2f555b0c91d9eb931ac41d03055 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Paszkowski?= Date: Tue, 20 Jan 2026 13:58:47 +0100 Subject: [PATCH 5/7] reader_concurrency_semaphore: Add preemptive_abort_factor to constructors The new parameter parametrizes the factor used to reject a read during admission. Its value shall be between 0.0 and 1.0 where + 0.0 means a read will never get rejected during admission + 1.0 means a read will immediatelly get rejected during admission Although passing values outside the interaval is possible, they will have the exact same effects as they were clamped to [0.0, 1.0]. --- reader_concurrency_semaphore.cc | 3 +++ reader_concurrency_semaphore.hh | 10 ++++++++-- reader_concurrency_semaphore_group.cc | 3 ++- reader_concurrency_semaphore_group.hh | 8 ++++++-- replica/database.cc | 6 +++++- sstables/sstables_manager.cc | 1 + test/boost/reader_concurrency_semaphore_test.cc | 13 ++++++++++++- 7 files changed, 37 insertions(+), 7 deletions(-) diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 66c451b9b1..c55da29aa4 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -1057,6 +1057,7 @@ reader_concurrency_semaphore::reader_concurrency_semaphore( utils::updateable_value serialize_limit_multiplier, utils::updateable_value kill_limit_multiplier, utils::updateable_value cpu_concurrency, + utils::updateable_value preemptive_abort_factor, register_metrics metrics) : _initial_resources(count, memory) , _resources(count, memory) @@ -1066,6 +1067,7 @@ reader_concurrency_semaphore::reader_concurrency_semaphore( , _serialize_limit_multiplier(std::move(serialize_limit_multiplier)) , _kill_limit_multiplier(std::move(kill_limit_multiplier)) , _cpu_concurrency(cpu_concurrency) + , _preemptive_abort_factor(preemptive_abort_factor) , _close_readers_gate(format("[reader_concurrency_semaphore {}] close_readers", _name)) , _permit_gate(format("[reader_concurrency_semaphore {}] permit", _name)) { @@ -1133,6 +1135,7 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring na utils::updateable_value(std::numeric_limits::max()), utils::updateable_value(std::numeric_limits::max()), utils::updateable_value(uint32_t(1)), + utils::updateable_value(float(0.0)), metrics) {} reader_concurrency_semaphore::~reader_concurrency_semaphore() { diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 6377244302..6476c89c2d 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -193,6 +193,8 @@ private: utils::updateable_value _serialize_limit_multiplier; utils::updateable_value _kill_limit_multiplier; utils::updateable_value _cpu_concurrency; + utils::updateable_value _preemptive_abort_factor; + stats _stats; std::optional _metrics; bool _stopped = false; @@ -290,6 +292,7 @@ public: utils::updateable_value serialize_limit_multiplier, utils::updateable_value kill_limit_multiplier, utils::updateable_value cpu_concurrency, + utils::updateable_value preemptive_abort_factor, register_metrics metrics); reader_concurrency_semaphore( @@ -300,9 +303,11 @@ public: utils::updateable_value serialize_limit_multiplier, utils::updateable_value kill_limit_multiplier, utils::updateable_value cpu_concurrency, + utils::updateable_value preemptive_abort_factor, register_metrics metrics) : reader_concurrency_semaphore(utils::updateable_value(count), memory, std::move(name), max_queue_length, - std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier), std::move(cpu_concurrency), metrics) + std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier), std::move(cpu_concurrency), + std::move(preemptive_abort_factor), metrics) { } /// Create a semaphore with practically unlimited count and memory. @@ -322,9 +327,10 @@ public: utils::updateable_value serialize_limit_multipler = utils::updateable_value(std::numeric_limits::max()), utils::updateable_value kill_limit_multipler = utils::updateable_value(std::numeric_limits::max()), utils::updateable_value cpu_concurrency = utils::updateable_value(1), + utils::updateable_value preemptive_abort_factor = utils::updateable_value(0.0f), register_metrics metrics = register_metrics::no) : reader_concurrency_semaphore(utils::updateable_value(count), memory, std::move(name), max_queue_length, std::move(serialize_limit_multipler), - std::move(kill_limit_multipler), std::move(cpu_concurrency), metrics) + std::move(kill_limit_multipler), std::move(cpu_concurrency), std::move(preemptive_abort_factor), metrics) {} virtual ~reader_concurrency_semaphore(); diff --git a/reader_concurrency_semaphore_group.cc b/reader_concurrency_semaphore_group.cc index d3a37ea3f6..e7551ea432 100644 --- a/reader_concurrency_semaphore_group.cc +++ b/reader_concurrency_semaphore_group.cc @@ -70,7 +70,8 @@ reader_concurrency_semaphore& reader_concurrency_semaphore_group::add_or_update( _max_queue_length, _serialize_limit_multiplier, _kill_limit_multiplier, - _cpu_concurrency + _cpu_concurrency, + _preemptive_abort_factor ); auto&& it = result.first; // since we serialize all group changes this change wait will be queues and no further operations diff --git a/reader_concurrency_semaphore_group.hh b/reader_concurrency_semaphore_group.hh index 2000dc6c45..4016355aa4 100644 --- a/reader_concurrency_semaphore_group.hh +++ b/reader_concurrency_semaphore_group.hh @@ -26,6 +26,7 @@ class reader_concurrency_semaphore_group { utils::updateable_value _serialize_limit_multiplier; utils::updateable_value _kill_limit_multiplier; utils::updateable_value _cpu_concurrency; + utils::updateable_value _preemptive_abort_factor; friend class database_test_wrapper; @@ -36,11 +37,12 @@ class reader_concurrency_semaphore_group { weighted_reader_concurrency_semaphore(size_t shares, int count, sstring name, size_t max_queue_length, utils::updateable_value serialize_limit_multiplier, utils::updateable_value kill_limit_multiplier, - utils::updateable_value cpu_concurrency) + utils::updateable_value cpu_concurrency, + utils::updateable_value preemptive_abort_factor) : weight(shares) , memory_share(0) , sem(utils::updateable_value(count), 0, name, max_queue_length, std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier), - std::move(cpu_concurrency), reader_concurrency_semaphore::register_metrics::yes) {} + std::move(cpu_concurrency), std::move(preemptive_abort_factor), reader_concurrency_semaphore::register_metrics::yes) {} }; std::unordered_map _semaphores; @@ -54,6 +56,7 @@ public: utils::updateable_value serialize_limit_multiplier, utils::updateable_value kill_limit_multiplier, utils::updateable_value cpu_concurrency, + utils::updateable_value preemptive_abort_factor, std::optional name_prefix = std::nullopt) : _total_memory(memory) , _total_weight(0) @@ -62,6 +65,7 @@ public: , _serialize_limit_multiplier(std::move(serialize_limit_multiplier)) , _kill_limit_multiplier(std::move(kill_limit_multiplier)) , _cpu_concurrency(std::move(cpu_concurrency)) + , _preemptive_abort_factor(std::move(preemptive_abort_factor)) , _operations_serializer(1) , _name_prefix(std::move(name_prefix)) { } diff --git a/replica/database.cc b/replica/database.cc index f123c4f7fe..a2523476f3 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -412,6 +412,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat utils::updateable_value(std::numeric_limits::max()), utils::updateable_value(std::numeric_limits::max()), utils::updateable_value(uint32_t(1)), + utils::updateable_value(0.0f), reader_concurrency_semaphore::register_metrics::yes) // No limits, just for accounting. , _compaction_concurrency_sem(reader_concurrency_semaphore::no_limits{}, "compaction", reader_concurrency_semaphore::register_metrics::no) @@ -424,6 +425,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat utils::updateable_value(std::numeric_limits::max()), utils::updateable_value(std::numeric_limits::max()), utils::updateable_value(uint32_t(1)), + utils::updateable_value(0.0f), reader_concurrency_semaphore::register_metrics::yes) , _view_update_read_concurrency_semaphores_group( max_memory_concurrent_view_update_reads(), @@ -432,6 +434,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat _cfg.view_update_reader_concurrency_semaphore_serialize_limit_multiplier, _cfg.view_update_reader_concurrency_semaphore_kill_limit_multiplier, _cfg.view_update_reader_concurrency_semaphore_cpu_concurrency, + utils::updateable_value(0.0f), "view_update") , _row_cache_tracker(_cfg.index_cache_fraction.operator utils::updateable_value(), cache_tracker::register_metrics::yes) , _apply_stage("db_apply", &database::do_apply) @@ -461,7 +464,8 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat , _reader_concurrency_semaphores_group(max_memory_concurrent_reads(), max_count_concurrent_reads, max_inactive_queue_length(), _cfg.reader_concurrency_semaphore_serialize_limit_multiplier, _cfg.reader_concurrency_semaphore_kill_limit_multiplier, - _cfg.reader_concurrency_semaphore_cpu_concurrency) + _cfg.reader_concurrency_semaphore_cpu_concurrency, + _cfg.reader_concurrency_semaphore_preemptive_abort_factor) , _stop_barrier(std::move(barrier)) , _update_memtable_flush_static_shares_action([this, &cfg] { return _memtable_controller.update_static_shares(cfg.memtable_flush_static_shares()); }) , _memtable_flush_static_shares_observer(cfg.memtable_flush_static_shares.observe(_update_memtable_flush_static_shares_action.make_observer())) diff --git a/sstables/sstables_manager.cc b/sstables/sstables_manager.cc index a4768dc54f..34cdc334ae 100644 --- a/sstables/sstables_manager.cc +++ b/sstables/sstables_manager.cc @@ -46,6 +46,7 @@ sstables_manager::sstables_manager( utils::updateable_value(std::numeric_limits::max()), utils::updateable_value(std::numeric_limits::max()), utils::updateable_value(uint32_t(1)), + utils::updateable_value(0.0f), reader_concurrency_semaphore::register_metrics::no) , _dir_semaphore(dir_sem) , _resolve_host_id(std::move(resolve_host_id)) diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index 6cc543c635..fde75f5115 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -1226,11 +1226,14 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_group) { auto serialize_multiplier = utils::updateable_value_source(2); auto kill_multiplier = utils::updateable_value_source(3); auto cpu_concurrency = utils::updateable_value_source(1); + auto preemptive_abort_factor = utils::updateable_value_source(0.0f); reader_concurrency_semaphore_group sem_group(initial_resources.memory, initial_resources.count, 1000, utils::updateable_value(serialize_multiplier), utils::updateable_value(kill_multiplier), - utils::updateable_value(cpu_concurrency)); + utils::updateable_value(cpu_concurrency), + utils::updateable_value(preemptive_abort_factor)); + auto stop_sem = deferred_stop(sem_group); circular_buffer recycle_bin; @@ -2175,6 +2178,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_count) { const auto serialize_multiplier = std::numeric_limits::max(); const auto kill_multiplier = std::numeric_limits::max(); const auto cpu_concurrency = 1; + const auto preemptive_abort_factor = 0.0f; reader_concurrency_semaphore semaphore( utils::updateable_value(count), @@ -2184,6 +2188,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_count) { utils::updateable_value(serialize_multiplier), utils::updateable_value(kill_multiplier), utils::updateable_value(cpu_concurrency), + utils::updateable_value(preemptive_abort_factor), reader_concurrency_semaphore::register_metrics::no); auto stop_sem = deferred_stop(semaphore); @@ -2203,6 +2208,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_cpu_concu const uint32_t initial_memory = 4 * 1024; const auto serialize_multiplier = std::numeric_limits::max(); const auto kill_multiplier = std::numeric_limits::max(); + const auto preemptive_abort_factor = 0.0f; reader_concurrency_semaphore semaphore( utils::updateable_value(initial_count), @@ -2212,6 +2218,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_cpu_concu utils::updateable_value(serialize_multiplier), utils::updateable_value(kill_multiplier), utils::updateable_value(cpu_concurrency), + utils::updateable_value(preemptive_abort_factor), reader_concurrency_semaphore::register_metrics::no); auto stop_sem = deferred_stop(semaphore); @@ -2264,6 +2271,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_wait_queue_overload_c utils::updateable_value(2), utils::updateable_value(4), utils::updateable_value(1), + utils::updateable_value(0.0f), reader_concurrency_semaphore::register_metrics::no); auto stop_sem = deferred_stop(semaphore); @@ -2317,6 +2325,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_double_permit_abort) utils::updateable_value(2), utils::updateable_value(400), utils::updateable_value(2), + utils::updateable_value(0.0f), reader_concurrency_semaphore::register_metrics::no); auto stop_sem = deferred_stop(semaphore); @@ -2381,6 +2390,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_always_admit_one_perm utils::updateable_value(200), utils::updateable_value(400), utils::updateable_value(1), + utils::updateable_value(0.0f), reader_concurrency_semaphore::register_metrics::no); auto stop_sem = deferred_stop(semaphore); @@ -2422,6 +2432,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_release_base_resource utils::updateable_value(200), utils::updateable_value(400), utils::updateable_value(1), + utils::updateable_value(0.0f), reader_concurrency_semaphore::register_metrics::no); auto stop_sem = deferred_stop(semaphore); From 8a613960afb1ce9b75d470717c5049b6d6d5023d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Paszkowski?= Date: Wed, 28 Jan 2026 14:15:41 +0100 Subject: [PATCH 6/7] permit_reader::impl: Replace break with return after evicting inactive permit on timeout Evicting an inactive permit destroyes the permit object when the reader is closed, making any further member access invalid. Switch from break to an early return to prevent any possible use-after-free after evict() in the state::inactive timeout path. --- reader_concurrency_semaphore.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index c55da29aa4..c4c66cbce5 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -237,7 +237,9 @@ private: break; case state::inactive: _semaphore.evict(*this, reader_concurrency_semaphore::evict_reason::time); - break; + // Return here on purpose. The evicted permit is destroyed when closing a reader. + // As a consequence, any member access beyond this point is invalid. + return; case state::evicted: case state::preemptive_aborted: break; From 7e1bbbd937c036a908305c0384bc7f19f716d9cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Paszkowski?= Date: Wed, 20 Nov 2024 15:42:44 +0100 Subject: [PATCH 7/7] reader_concurrency_semaphore: Check during admission if read may timeout When a shard on a replica is overloaded, it breaks down completely, throughput collapses, latencies go through the roof and the node/shard can even become completely unresponsive to new connection attempts. When reads arrive, they have to wait for admission on the reader concurrency semaphore. If the node is overloaded, the reads will be queued and thus they can time out while being in the queue or during the execution. In the latter case, the timeout does not always result in the read being aborted. Once the shard is sufficiently loaded, it is possible that most queued reads will time out, because the average time it takes for a queued read to be admitted is around that of the timeout. If a read times out, any work we already did, or are about to do on it is wasted effort. Therefore, the patch tries to prevent it by checking if an admitted read has a chance to complete in time and abort it if not. It uses the following cryteria: if read's remaining time <= read's timeout when arrived to the semaphore * preemptive factor; the read is rejected and the next one from the wait list is considered. --- reader_concurrency_semaphore.cc | 30 ++++++++++++++-- test/boost/database_test.cc | 1 + .../reader_concurrency_semaphore_test.cc | 35 ++++++++++++++++++- 3 files changed, 63 insertions(+), 3 deletions(-) diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index c4c66cbce5..82e87adda2 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -148,6 +148,7 @@ public: }; private: + const db::timeout_clock::time_point _created; reader_concurrency_semaphore& _semaphore; schema_ptr _schema; @@ -254,7 +255,8 @@ public: struct value_tag {}; impl(reader_concurrency_semaphore& semaphore, schema_ptr schema, const std::string_view& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) - : _semaphore(semaphore) + : _created(db::timeout_clock::now()) + , _semaphore(semaphore) , _schema(std::move(schema)) , _op_name_view(op_name) , _base_resources(base_resources) @@ -265,7 +267,8 @@ public: _semaphore.on_permit_created(*this); } impl(reader_concurrency_semaphore& semaphore, schema_ptr schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) - : _semaphore(semaphore) + : _created(db::timeout_clock::now()) + , _semaphore(semaphore) , _schema(std::move(schema)) , _op_name(std::move(op_name)) , _op_name_view(_op_name) @@ -485,6 +488,10 @@ public: return _semaphore.do_wait_admission(*this); } + db::timeout_clock::time_point created() const noexcept { + return _created; + } + db::timeout_clock::time_point timeout() const noexcept { return _ttl_timer.armed() ? _ttl_timer.get_timeout() : db::no_timeout; } @@ -1513,6 +1520,25 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept { auto& permit = _wait_list.front(); dequeue_permit(permit); try { + // Do not admit the read as it is unlikely to finish before its timeout. The condition is: + // permit's remaining time <= preemptive_abort_factor * permit's time budget + // + // The additional check for remaining_time > 0 is to avoid preemptive aborting reads + // that already timed out but are still in the wait list due to scheduling delays. + // It also effectively disables preemptive aborting when the factor is set to 0. + const auto time_budget = permit.timeout() - permit.created(); + const auto remaining_time = permit.timeout() - db::timeout_clock::now(); + if (remaining_time > db::timeout_clock::duration::zero() && remaining_time <= _preemptive_abort_factor() * time_budget) { + permit.on_preemptive_aborted(); + using ms = std::chrono::milliseconds; + tracing::trace(permit.trace_state(), "[reader concurrency semaphore {}] read shed as unlikely to finish (elapsed: {}, timeout: {}, preemptive_factor: {})", + _name, + std::chrono::duration_cast(time_budget - remaining_time), + std::chrono::duration_cast(time_budget), + _preemptive_abort_factor()); + continue; + } + if (permit.get_state() == reader_permit::state::waiting_for_memory) { _blessed_permit = &permit; permit.on_granted_memory(); diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index f52f5c273d..13e69ec8b8 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -2152,6 +2152,7 @@ struct scoped_execption_log_level { SEASTAR_TEST_CASE(replica_read_timeout_no_exception) { cql_test_config cfg; + cfg.db_config->reader_concurrency_semaphore_preemptive_abort_factor.set(0.0); const auto read_timeout = 10ms; const auto write_timeout = 10s; cfg.query_timeout.emplace(timeout_config{ diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index fde75f5115..14e1fb1884 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -517,6 +517,38 @@ SEASTAR_TEST_CASE(reader_concurrency_semaphore_timeout) { }); } +SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_abort) { + const auto preemptive_abort_factor = 0.5f; + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, replica::new_reader_base_cost, + 100, utils::updateable_value(std::numeric_limits::max()), utils::updateable_value(std::numeric_limits::max()), + utils::updateable_value(1), utils::updateable_value(preemptive_abort_factor)); + auto stop_sem = deferred_stop(semaphore); + + { + BOOST_REQUIRE(semaphore.get_stats().total_reads_shed_due_to_overload == 0); + + auto timeout = db::timeout_clock::now() + 500ms; + + reader_permit_opt permit1 = semaphore.obtain_permit(nullptr, "permit1", replica::new_reader_base_cost, timeout, {}).get(); + + auto permit2_fut = semaphore.obtain_permit(nullptr, "permit2", replica::new_reader_base_cost, timeout, {}); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 1); + + // The permits are rejected when the remaining time is less than half of its timeout when arrived to the semaphore. + // Hence, sleep 300ms to reject the permits in the waitlist during admission. + seastar::sleep(300ms).get(); + + permit1 = {}; + const auto futures_failed = eventually_true([&] { return permit2_fut.failed(); }); + BOOST_CHECK(futures_failed); + BOOST_CHECK_THROW(std::rethrow_exception(permit2_fut.get_exception()), semaphore_aborted); + BOOST_CHECK(semaphore.get_stats().total_reads_shed_due_to_overload > 0); + } + + // All units should have been deposited back. + REQUIRE_EVENTUALLY_EQUAL([&] { return semaphore.available_resources().memory; }, replica::new_reader_base_cost); +} + SEASTAR_TEST_CASE(reader_concurrency_semaphore_max_queue_length) { return async([&] () { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, replica::new_reader_base_cost, 2); @@ -597,7 +629,8 @@ SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_dump_reader_diganostics) { permit.resources = permit.permit->consume_resources(reader_resources(tests::random::get_int(0, 1), tests::random::get_int(1024, 16 * 1024 * 1024))); } else { - const auto timeout_seconds = tests::random::get_int(0, 3); + //Ensure timeout_seconds > 0 to avoid permits being rejected during admission. The test will become flaky. + const auto timeout_seconds = tests::random::get_int(1, 4); permit.permit_fut = semaphore.obtain_permit( schema,