diff --git a/database.cc b/database.cc index 0184820aea..c03579a6e3 100644 --- a/database.cc +++ b/database.cc @@ -199,19 +199,21 @@ database::database(const db::config& cfg, database_config dbcfg) })) , _read_concurrency_sem(max_count_concurrent_reads, max_memory_concurrent_reads(), + "_read_concurrency_sem", max_inactive_queue_length(), [this] { ++_stats->sstable_read_queue_overloaded; - return std::make_exception_ptr(std::runtime_error("sstable read queue overloaded")); }) // No timeouts or queue length limits - a failure here can kill an entire repair. // Trust the caller to limit concurrency. , _streaming_concurrency_sem( max_count_streaming_concurrent_reads, - max_memory_streaming_concurrent_reads()) + max_memory_streaming_concurrent_reads(), + "_streaming_concurrency_sem") , _system_read_concurrency_sem( max_count_system_concurrent_reads, - max_memory_system_concurrent_reads()) + max_memory_system_concurrent_reads(), + "_system_read_concurrency_sem") , _data_query_stage("data_query", &column_family::query) , _mutation_query_stage() , _apply_stage("db_apply", &database::do_apply) diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 2339005dfb..4a4ce5c3ca 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -20,6 +20,7 @@ */ #include +#include #include "reader_concurrency_semaphore.hh" @@ -77,7 +78,12 @@ bool reader_concurrency_semaphore::try_evict_one_inactive_read() { future> reader_concurrency_semaphore::wait_admission(size_t memory, db::timeout_clock::time_point timeout) { if (_wait_list.size() >= _max_queue_length) { - return make_exception_future>(_make_queue_overloaded_exception()); + if (_prethrow_action) { + _prethrow_action(); + } + return make_exception_future>( + std::make_exception_ptr(std::runtime_error( + format("{}: restricted mutation reader queue overload", _name)))); } auto r = resources(1, static_cast(memory)); auto it = _inactive_reads.begin(); diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index ab25661fb8..83c0eef2f6 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -39,16 +39,16 @@ /// construction. New readers will only be admitted when there is both /// enough count and memory units available. Readers are admitted in /// FIFO order. -/// It's possible to specify the maximum allowed number of waiting +/// Semaphore's `name` must be provided in ctor and its only purpose is +/// to increase readability of exceptions: both timeout exceptions and +/// queue overflow exceptions (read below) include this `name` in messages. +/// It's also possible to specify the maximum allowed number of waiting /// readers by the `max_queue_length` constructor parameter. When the -/// number waiting readers would be equal or greater than this number -/// (when calling `wait_admission()`) an exception will be thrown. -/// The type of the exception and optionally some additional code -/// that should be executed when this happens can be customized by the -/// `raise_queue_overloaded_exception` constructor parameter. This -/// function will be called every time the queue limit is surpassed. -/// It is expected to return an `std::exception_ptr` that will be -/// injected into the future. +/// number of waiting readers becomes equal or greater than +/// `max_queue_length` (upon calling `wait_admission()`) an exception of +/// type `std::runtime_error` is thrown. Optionally, some additional +/// code can be executed just before throwing (`prethrow_action` +/// constructor parameter). class reader_concurrency_semaphore { public: struct resources { @@ -151,9 +151,14 @@ private: resources res; entry(promise>&& pr, resources r) : pr(std::move(pr)), res(r) {} }; - struct expiry_handler { + + class expiry_handler { + sstring _semaphore_name; + public: + explicit expiry_handler(sstring semaphore_name) + : _semaphore_name(std::move(semaphore_name)) {} void operator()(entry& e) noexcept { - e.pr.set_exception(semaphore_timed_out()); + e.pr.set_exception(named_semaphore_timed_out(_semaphore_name)); } }; @@ -162,17 +167,14 @@ private: expiring_fifo _wait_list; + sstring _name; size_t _max_queue_length = std::numeric_limits::max(); - std::function _make_queue_overloaded_exception; + std::function _prethrow_action; uint64_t _next_id = 1; std::map> _inactive_reads; inactive_read_stats _inactive_read_stats; private: - static std::exception_ptr default_make_queue_overloaded_exception() { - return std::make_exception_ptr(std::runtime_error("restricted mutation reader queue overload")); - } - bool has_available_units(const resources& r) const { return bool(_resources) && _resources >= r; } @@ -195,19 +197,23 @@ public: reader_concurrency_semaphore(int count, ssize_t memory, + sstring name, size_t max_queue_length = std::numeric_limits::max(), - std::function raise_queue_overloaded_exception = default_make_queue_overloaded_exception) + std::function prethrow_action = nullptr) : _resources(count, memory) + , _wait_list(expiry_handler(name)) + , _name(std::move(name)) , _max_queue_length(max_queue_length) - , _make_queue_overloaded_exception(raise_queue_overloaded_exception) { - } + , _prethrow_action(std::move(prethrow_action)) {} /// Create a semaphore with practically unlimited count and memory. /// /// And conversely, no queue limit either. explicit reader_concurrency_semaphore(no_limits) - : reader_concurrency_semaphore(std::numeric_limits::max(), std::numeric_limits::max()) { - } + : reader_concurrency_semaphore( + std::numeric_limits::max(), + std::numeric_limits::max(), + "unlimited reader_concurrency_semaphore") {} reader_concurrency_semaphore(const reader_concurrency_semaphore&) = delete; reader_concurrency_semaphore& operator=(const reader_concurrency_semaphore&) = delete; diff --git a/tests/mutation_reader_test.cc b/tests/mutation_reader_test.cc index e4c60c8452..c751d7a7d6 100644 --- a/tests/mutation_reader_test.cc +++ b/tests/mutation_reader_test.cc @@ -1079,7 +1079,7 @@ class dummy_file_impl : public file_impl { SEASTAR_TEST_CASE(reader_restriction_file_tracking) { return async([&] { - reader_concurrency_semaphore semaphore(100, 4 * 1024); + reader_concurrency_semaphore semaphore(100, 4 * 1024, get_name()); // Testing the tracker here, no need to have a base cost. auto permit = semaphore.wait_admission(0).get0(); @@ -1139,7 +1139,7 @@ SEASTAR_TEST_CASE(reader_restriction_file_tracking) { SEASTAR_TEST_CASE(restricted_reader_reading) { return sstables::test_env::do_with_async([&] (sstables::test_env& env) { storage_service_for_tests ssft; - reader_concurrency_semaphore semaphore(2, new_reader_base_cost); + reader_concurrency_semaphore semaphore(2, new_reader_base_cost, get_name()); { simple_schema s; @@ -1210,7 +1210,7 @@ SEASTAR_TEST_CASE(restricted_reader_reading) { SEASTAR_TEST_CASE(restricted_reader_timeout) { return sstables::test_env::do_with_async([&] (sstables::test_env& env) { storage_service_for_tests ssft; - reader_concurrency_semaphore semaphore(2, new_reader_base_cost); + reader_concurrency_semaphore semaphore(2, new_reader_base_cost, get_name()); { simple_schema s; @@ -1256,9 +1256,7 @@ SEASTAR_TEST_CASE(restricted_reader_max_queue_length) { return sstables::test_env::do_with_async([&] (sstables::test_env& env) { storage_service_for_tests ssft; - struct queue_overloaded_exception {}; - - reader_concurrency_semaphore semaphore(2, new_reader_base_cost, 2, [] { return std::make_exception_ptr(queue_overloaded_exception()); }); + reader_concurrency_semaphore semaphore(2, new_reader_base_cost, get_name(), 2); { simple_schema s; @@ -1279,7 +1277,7 @@ SEASTAR_TEST_CASE(restricted_reader_max_queue_length) { BOOST_REQUIRE_EQUAL(semaphore.waiters(), 2); // The queue should now be full. - BOOST_REQUIRE_THROW(reader4().get(), queue_overloaded_exception); + BOOST_REQUIRE_THROW(reader4().get(), std::runtime_error); reader1_ptr.reset(); read2_fut.get(); @@ -1294,7 +1292,7 @@ SEASTAR_TEST_CASE(restricted_reader_max_queue_length) { SEASTAR_TEST_CASE(restricted_reader_create_reader) { return sstables::test_env::do_with_async([&] (sstables::test_env& env) { storage_service_for_tests ssft; - reader_concurrency_semaphore semaphore(100, new_reader_base_cost); + reader_concurrency_semaphore semaphore(100, new_reader_base_cost, get_name()); { simple_schema s; @@ -1327,8 +1325,8 @@ SEASTAR_TEST_CASE(restricted_reader_create_reader) { } SEASTAR_TEST_CASE(test_restricted_reader_as_mutation_source) { - return seastar::async([] { - reader_concurrency_semaphore semaphore(100, 10 * new_reader_base_cost); + return seastar::async([test_name = get_name()] { + reader_concurrency_semaphore semaphore(100, 10 * new_reader_base_cost, test_name); auto make_restricted_populator = [&semaphore](schema_ptr s, const std::vector &muts) { auto mt = make_lw_shared(s); @@ -2067,7 +2065,8 @@ public: const auto shard = engine().cpu_id(); if (!_contexts[shard].semaphore) { if (_evict_paused_readers) { - _contexts[shard].semaphore = make_foreign(std::make_unique(0, std::numeric_limits::max())); + _contexts[shard].semaphore = make_foreign(std::make_unique(0, std::numeric_limits::max(), + format("reader_concurrency_semaphore @shard_id={}", shard))); // Add a waiter, so that all registered inactive reads are // immediately evicted. // We don't care about the returned future.