diff --git a/db/config.cc b/db/config.cc index 5af8abd7a8..c2d9e0705f 100644 --- a/db/config.cc +++ b/db/config.cc @@ -843,6 +843,10 @@ db::config::config(std::shared_ptr exts) , max_memory_for_unlimited_query_hard_limit(this, "max_memory_for_unlimited_query_hard_limit", "max_memory_for_unlimited_query", liveness::LiveUpdate, value_status::Used, (uint64_t(100) << 20), "Maximum amount of memory a query, whose memory consumption is not naturally limited, is allowed to consume, e.g. non-paged and reverse queries. " "This is the hard limit, queries violating this limit will be aborted.") + , reader_concurrency_semaphore_serialize_limit_multiplier(this, "reader_concurrency_semaphore_serialize_limit_multiplier", liveness::LiveUpdate, value_status::Used, 2, + "Start serializing reads after their collective memory consumption goes above $normal_limit * $multiplier.") + , reader_concurrency_semaphore_kill_limit_multiplier(this, "reader_concurrency_semaphore_kill_limit_multiplier", liveness::LiveUpdate, value_status::Used, 4, + "Start killing reads after their collective memory consumption goes above $normal_limit * $multiplier.") , twcs_max_window_count(this, "twcs_max_window_count", liveness::LiveUpdate, value_status::Used, 50, "The maximum number of compaction windows allowed when making use of TimeWindowCompactionStrategy. A setting of 0 effectively disables the restriction.") , initial_sstable_loading_concurrency(this, "initial_sstable_loading_concurrency", value_status::Used, 4u, diff --git a/db/config.hh b/db/config.hh index 8bc50d5a01..178640acad 100644 --- a/db/config.hh +++ b/db/config.hh @@ -339,6 +339,8 @@ public: named_value max_clustering_key_restrictions_per_query; named_value max_memory_for_unlimited_query_soft_limit; named_value max_memory_for_unlimited_query_hard_limit; + named_value reader_concurrency_semaphore_serialize_limit_multiplier; + named_value reader_concurrency_semaphore_kill_limit_multiplier; named_value twcs_max_window_count; named_value initial_sstable_loading_concurrency; named_value enable_3_1_0_compatibility_mode; diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index bd3275e12e..81af115aa3 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -26,9 +26,14 @@ std::ostream& operator<<(std::ostream& os , const reader_resources& r) { return os; } -reader_permit::resource_units::resource_units(reader_permit permit, reader_resources res) noexcept +reader_permit::resource_units::resource_units(reader_permit permit, reader_resources res, already_consumed_tag) : _permit(std::move(permit)), _resources(res) { +} + +reader_permit::resource_units::resource_units(reader_permit permit, reader_resources res) + : _permit(std::move(permit)) { _permit.consume(res); + _resources = res; } reader_permit::resource_units::resource_units(resource_units&& o) noexcept @@ -58,7 +63,9 @@ void reader_permit::resource_units::add(resource_units&& o) { } void reader_permit::resource_units::reset(reader_resources res) { - _permit.consume(res); + if (res.non_zero()) { + _permit.consume(res); + } if (_resources.non_zero()) { _permit.signal(_resources); } @@ -83,6 +90,9 @@ class reader_permit::impl db::timeout_clock::time_point _timeout; query::max_result_size _max_result_size{query::result_memory_limiter::unlimited_result_size}; uint64_t _sstables_read = 0; + size_t _requested_memory = 0; + std::optional> _memory_future; + uint64_t _oom_kills = 0; private: void on_permit_used() { @@ -105,6 +115,10 @@ private: if (_used_branches) { _state = reader_permit::state::active_used; on_permit_used(); + if (_blocked_branches) { + _state = reader_permit::state::active_blocked; + on_permit_blocked(); + } } else { _state = reader_permit::state::active_unused; } @@ -112,6 +126,9 @@ private: void on_permit_inactive(reader_permit::state st) { _state = st; + if (_marked_as_blocked) { + on_permit_unblocked(); + } if (_marked_as_used) { on_permit_unused(); } @@ -193,8 +210,13 @@ public: return _state; } - void on_waiting() { - on_permit_inactive(reader_permit::state::waiting); + void on_waiting_for_admission() { + on_permit_inactive(reader_permit::state::waiting_for_admission); + } + + void on_waiting_for_memory(future<> fut) { + on_permit_inactive(reader_permit::state::waiting_for_memory); + _memory_future.emplace(std::move(fut)); } void on_admission() { @@ -204,6 +226,17 @@ public: _base_resources_consumed = true; } + void on_granted_memory() { + if (_state == reader_permit::state::waiting_for_memory) { + on_permit_active(); + } + consume({0, std::exchange(_requested_memory, 0)}); + } + + future<> get_memory_future() { + return _memory_future->get_future(); + } + void on_register_as_inactive() { assert(_state == reader_permit::state::active_unused || _state == reader_permit::state::active_used); on_permit_inactive(reader_permit::state::inactive); @@ -224,8 +257,8 @@ public: } void consume(reader_resources res) { + _semaphore.consume(*this, res); _resources += res; - _semaphore.consume(res); } void signal(reader_resources res) { @@ -233,6 +266,13 @@ public: _semaphore.signal(res); } + future request_memory(size_t memory) { + _requested_memory += memory; + return _semaphore.request_memory(*this, memory).then([this, memory] { + return resource_units(reader_permit(shared_from_this()), {0, ssize_t(memory)}, resource_units::already_consumed_tag{}); + }); + } + reader_resources resources() const { return _resources; } @@ -347,6 +387,10 @@ public: --_semaphore._stats.disk_reads; } } + + bool on_oom_kill() noexcept { + return !bool(_oom_kills++); + } }; static_assert(std::is_nothrow_copy_constructible_v); @@ -368,14 +412,26 @@ reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const sche { } -void reader_permit::on_waiting() { - _impl->on_waiting(); +void reader_permit::on_waiting_for_admission() { + _impl->on_waiting_for_admission(); +} + +void reader_permit::on_waiting_for_memory(future<> fut) { + _impl->on_waiting_for_memory(std::move(fut)); } void reader_permit::on_admission() { _impl->on_admission(); } +void reader_permit::on_granted_memory() { + _impl->on_granted_memory(); +} + +future<> reader_permit::get_memory_future() { + return _impl->get_memory_future(); +} + reader_permit::~reader_permit() { } @@ -383,6 +439,10 @@ reader_concurrency_semaphore& reader_permit::semaphore() { return _impl->semaphore(); } +reader_permit::state reader_permit::get_state() const { + return _impl->get_state(); +} + bool reader_permit::needs_readmission() const { return _impl->needs_readmission(); } @@ -407,6 +467,10 @@ reader_permit::resource_units reader_permit::consume_resources(reader_resources return resource_units(*this, res); } +future reader_permit::request_memory(size_t memory) { + return _impl->request_memory(memory); +} + reader_resources reader_permit::consumed_resources() const { return _impl->resources(); } @@ -465,8 +529,11 @@ void reader_permit::on_finish_sstable_read() noexcept { std::ostream& operator<<(std::ostream& os, reader_permit::state s) { switch (s) { - case reader_permit::state::waiting: - os << "waiting"; + case reader_permit::state::waiting_for_admission: + os << "waiting_for_admission"; + break; + case reader_permit::state::waiting_for_memory: + os << "waiting_for_memory"; break; case reader_permit::state::active_unused: os << "active/unused"; @@ -658,18 +725,49 @@ future<> reader_concurrency_semaphore::execution_loop() noexcept { } } +uint64_t reader_concurrency_semaphore::get_serialize_limit() const { + if (!_serialize_limit_multiplier() || _serialize_limit_multiplier() == std::numeric_limits::max() || is_unlimited()) [[unlikely]] { + return std::numeric_limits::max(); + } + return _initial_resources.memory * _serialize_limit_multiplier(); +} + +uint64_t reader_concurrency_semaphore::get_kill_limit() const { + if (!_kill_limit_multiplier() || _kill_limit_multiplier() == std::numeric_limits::max() || is_unlimited()) [[unlikely]] { + return std::numeric_limits::max(); + } + return _initial_resources.memory * _kill_limit_multiplier(); +} + +void reader_concurrency_semaphore::consume(reader_permit::impl& permit, resources r) { + // We check whether we even reached the memory limit first. + // This is a cheap check and should be false most of the time, providing a + // cheap short-circuit. + if (_resources.memory <= 0 && (consumed_resources().memory + r.memory) >= get_kill_limit()) [[unlikely]] { + if (permit.on_oom_kill()) { + ++_stats.total_reads_killed_due_to_kill_limit; + } + maybe_dump_reader_permit_diagnostics(*this, _permit_list, "kill limit triggered"); + throw std::bad_alloc(); + } + _resources -= r; +} + void reader_concurrency_semaphore::signal(const resources& r) noexcept { _resources += r; maybe_admit_waiters(); } -reader_concurrency_semaphore::reader_concurrency_semaphore(int count, ssize_t memory, sstring name, size_t max_queue_length) +reader_concurrency_semaphore::reader_concurrency_semaphore(int count, ssize_t memory, sstring name, size_t max_queue_length, + utils::updateable_value serialize_limit_multiplier, utils::updateable_value kill_limit_multiplier) : _initial_resources(count, memory) , _resources(count, memory) , _wait_list(expiry_handler(*this)) , _ready_list(max_queue_length) , _name(std::move(name)) , _max_queue_length(max_queue_length) + , _serialize_limit_multiplier(std::move(serialize_limit_multiplier)) + , _kill_limit_multiplier(std::move(kill_limit_multiplier)) { } reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring name) @@ -677,7 +775,9 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring na std::numeric_limits::max(), std::numeric_limits::max(), std::move(name), - std::numeric_limits::max()) {} + std::numeric_limits::max(), + utils::updateable_value(std::numeric_limits::max()), + utils::updateable_value(std::numeric_limits::max())) {} reader_concurrency_semaphore::~reader_concurrency_semaphore() { if (!_stats.total_permits) { @@ -879,16 +979,23 @@ std::exception_ptr reader_concurrency_semaphore::check_queue_size(std::string_vi return {}; } -future<> reader_concurrency_semaphore::enqueue_waiter(reader_permit permit, read_func func) { +future<> reader_concurrency_semaphore::enqueue_waiter(reader_permit permit, read_func func, wait_on wait) { if (auto ex = check_queue_size("wait")) { return make_exception_future<>(std::move(ex)); } promise<> pr; auto fut = pr.get_future(); - permit.on_waiting(); auto timeout = permit.timeout(); - _wait_list.push_back(entry(std::move(pr), std::move(permit), std::move(func)), timeout); - ++_stats.reads_enqueued; + if (wait == wait_on::admission) { + permit.on_waiting_for_admission(); + _wait_list.push_to_admission_queue(entry(std::move(pr), std::move(permit), std::move(func)), timeout); + ++_stats.reads_enqueued_for_admission; + } else { + permit.on_waiting_for_memory(std::move(fut)); + fut = permit.get_memory_future(); + _wait_list.push_to_memory_queue(entry(std::move(pr), std::move(permit), std::move(func)), timeout); + ++_stats.reads_enqueued_for_memory; + } return fut; } @@ -914,6 +1021,26 @@ void reader_concurrency_semaphore::evict_readers_in_background() { reader_concurrency_semaphore::can_admit reader_concurrency_semaphore::can_admit_read(const reader_permit& permit) const noexcept { + if (_resources.memory < 0) [[unlikely]] { + const auto consumed_memory = consumed_resources().memory; + if (consumed_memory >= get_kill_limit()) { + return can_admit::no; + } + if (consumed_memory >= get_serialize_limit()) { + if (_blessed_permit) { + // blessed permit is never in the wait list + return can_admit::no; + } else { + auto res = permit.get_state() == reader_permit::state::waiting_for_memory ? can_admit::yes : can_admit::no; + return res; + } + } + } + + if (permit.get_state() == reader_permit::state::waiting_for_memory) { + return can_admit::yes; + } + if (!_ready_list.empty()) { return can_admit::no; } @@ -940,7 +1067,7 @@ future<> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, r const auto admit = can_admit_read(permit); if (admit != can_admit::yes || !_wait_list.empty()) { - auto fut = enqueue_waiter(std::move(permit), std::move(func)); + auto fut = enqueue_waiter(std::move(permit), std::move(func), wait_on::admission); if (admit == can_admit::yes && !_wait_list.empty()) { // This is a contradiction: the semaphore could admit waiters yet it has waiters. // Normally, the semaphore should admit waiters as soon as it can. @@ -967,8 +1094,13 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept { while (!_wait_list.empty() && (admit = can_admit_read(_wait_list.front().permit)) == can_admit::yes) { auto& x = _wait_list.front(); try { - x.permit.on_admission(); - ++_stats.reads_admitted; + if (x.permit.get_state() == reader_permit::state::waiting_for_memory) { + _blessed_permit = x.permit._impl.get(); + x.permit.on_granted_memory(); + } else { + x.permit.on_admission(); + ++_stats.reads_admitted; + } if (x.func) { _ready_list.push(std::move(x)); } else { @@ -985,6 +1117,29 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept { } } +future<> reader_concurrency_semaphore::request_memory(reader_permit::impl& permit, size_t memory) { + // Already blocked on memory? + if (permit.get_state() == reader_permit::state::waiting_for_memory) { + return permit.get_memory_future(); + } + + if (_resources.memory > 0 || (consumed_resources().memory + memory) < get_serialize_limit()) { + permit.on_granted_memory(); + return make_ready_future<>(); + } + + if (!_blessed_permit) { + _blessed_permit = &permit; + } + + if (_blessed_permit == &permit) { + permit.on_granted_memory(); + return make_ready_future<>(); + } + + return enqueue_waiter(reader_permit(permit.shared_from_this()), {}, wait_on::memory); +} + void reader_concurrency_semaphore::on_permit_created(reader_permit::impl& permit) { _permit_gate.enter(); _permit_list.push_back(permit); @@ -996,6 +1151,10 @@ void reader_concurrency_semaphore::on_permit_destroyed(reader_permit::impl& perm permit.unlink(); _permit_gate.leave(); --_stats.current_permits; + if (_blessed_permit == &permit) { + _blessed_permit = nullptr; + maybe_admit_waiters(); + } } void reader_concurrency_semaphore::on_permit_used() noexcept { @@ -1082,6 +1241,12 @@ std::string reader_concurrency_semaphore::dump_diagnostics(unsigned max_lines) c return os.str(); } +void reader_concurrency_semaphore::foreach_permit(noncopyable_function func) { + for (auto& p : _permit_list) { + func(reader_permit(p.shared_from_this())); + } +} + // A file that tracks the memory usage of buffers resulting from read // operations. class tracking_file_impl : public file_impl { @@ -1153,8 +1318,10 @@ public: } virtual future> dma_read_bulk(uint64_t offset, size_t range_size, const io_priority_class& pc) override { - return get_file_impl(_tracked_file)->dma_read_bulk(offset, range_size, pc).then([this, units = _permit.consume_memory(range_size)] (temporary_buffer buf) { - return make_ready_future>(make_tracked_temporary_buffer(std::move(buf), _permit)); + return _permit.request_memory(range_size).then([this, offset, range_size, &pc] (reader_permit::resource_units units) { + return get_file_impl(_tracked_file)->dma_read_bulk(offset, range_size, pc).then([this, units = std::move(units)] (temporary_buffer buf) mutable { + return make_ready_future>(make_tracked_temporary_buffer(std::move(buf), std::move(units))); + }); }); } }; diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index c7b2445723..bffbff1876 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -15,6 +15,7 @@ #include #include "reader_permit.hh" #include "readers/flat_mutation_reader_v2.hh" +#include "utils/updateable_value.hh" namespace bi = boost::intrusive; @@ -41,6 +42,21 @@ using namespace seastar; /// code can be executed just before throwing (`prethrow_action` /// constructor parameter). /// +/// The semaphore has 3 layers of defense against consuming more memory +/// than desired: +/// 1) After memory consumption is larger than the configured memory limit, +/// no more reads are admitted +/// 2) After memory consumption is larger than `_serialize_limit_multiplier` +/// times the configured memory limit, reads are serialized: only one of them +/// is allowed to make progress, the rest is made to wait before they can +/// consume more memory. Enforced via `request_memory()`. +/// 4) After memory consumption is larger than `_kill_limit_multiplier` +/// times the configured memory limit, reads are killed, by `consume()` +/// throwing `std::bad_alloc`. +/// +/// This makes `_kill_limit_multiplier` times the memory limit the effective +/// upper bound of the memory consumed by reads. +/// /// The semaphore also acts as an execution stage for reads. This /// functionality is exposed via \ref with_permit() and \ref /// with_ready_permit(). @@ -71,10 +87,14 @@ public: uint64_t total_failed_reads = 0; // Total number of reads rejected because the admission queue reached its max capacity uint64_t total_reads_shed_due_to_overload = 0; + // Total number of reads killed due to the memory consumption reaching the kill limit. + uint64_t total_reads_killed_due_to_kill_limit = 0; // Total number of reads admitted, via all admission paths. uint64_t reads_admitted = 0; // Total number of reads enqueued to wait for admission. - uint64_t reads_enqueued = 0; + uint64_t reads_enqueued_for_admission = 0; + // Total number of reads enqueued to wait for memory. + uint64_t reads_enqueued_for_memory = 0; // Total number of permits created so far. uint64_t total_permits = 0; // Current number of permits. @@ -177,11 +197,48 @@ private: resources _initial_resources; resources _resources; - expiring_fifo _wait_list; + class wait_queue { + // Stores entries for permits waiting to be admitted. + expiring_fifo _admission_queue; + // Stores entries for serialized permits waiting to obtain memory. + expiring_fifo _memory_queue; + public: + wait_queue(expiry_handler eh) : _admission_queue(eh), _memory_queue(eh) { } + size_t size() const { + return _admission_queue.size() + _memory_queue.size(); + } + bool empty() const { + return _admission_queue.empty() && _memory_queue.empty(); + } + void push_to_admission_queue(entry&& e, db::timeout_clock::time_point timeout) { + _admission_queue.push_back(std::move(e), timeout); + } + void push_to_memory_queue(entry&& e, db::timeout_clock::time_point timeout) { + _memory_queue.push_back(std::move(e), timeout); + } + entry& front() { + if (_memory_queue.empty()) { + return _admission_queue.front(); + } else { + return _memory_queue.front(); + } + } + void pop_front() { + if (_memory_queue.empty()) { + _admission_queue.pop_front(); + } else { + _memory_queue.pop_front(); + } + } + }; + + wait_queue _wait_list; queue _ready_list; sstring _name; size_t _max_queue_length = std::numeric_limits::max(); + utils::updateable_value _serialize_limit_multiplier; + utils::updateable_value _kill_limit_multiplier; inactive_reads_type _inactive_reads; stats _stats; permit_list_type _permit_list; @@ -190,6 +247,7 @@ private: gate _close_readers_gate; gate _permit_gate; std::optional> _execution_loop_future; + reader_permit::impl* _blessed_permit = nullptr; private: void do_detach_inactive_reader(inactive_read&, evict_reason reason) noexcept; @@ -204,7 +262,8 @@ private: // Add the permit to the wait queue and return the future which resolves when // the permit is admitted (popped from the queue). - future<> enqueue_waiter(reader_permit permit, read_func func); + enum class wait_on { admission, memory }; + future<> enqueue_waiter(reader_permit permit, read_func func, wait_on wait); void evict_readers_in_background(); future<> do_wait_admission(reader_permit permit, read_func func = {}); @@ -218,6 +277,16 @@ private: void maybe_admit_waiters() noexcept; + // Request more memory for the permit. + // Request is instantly granted while memory consumption of all reads is + // below _kill_limit_multiplier. + // After memory consumption goes above the above limit, only one reader + // (permit) is allowed to make progress, this method will block for all other + // one, until: + // * The blessed read finishes and a new blessed permit is choosen. + // * Memory consumption falls below the limit. + future<> request_memory(reader_permit::impl& permit, size_t memory); + void on_permit_created(reader_permit::impl&); void on_permit_destroyed(reader_permit::impl&) noexcept; @@ -234,6 +303,13 @@ private: future<> execution_loop() noexcept; + uint64_t get_serialize_limit() const; + uint64_t get_kill_limit() const; + + // Throws std::bad_alloc if memory consumed is oom_kill_limit_multiply_threshold more than the memory limit. + void consume(reader_permit::impl& permit, resources r); + void signal(const resources& r) noexcept; + public: struct no_limits { }; @@ -243,7 +319,9 @@ public: reader_concurrency_semaphore(int count, ssize_t memory, sstring name, - size_t max_queue_length); + size_t max_queue_length, + utils::updateable_value serialize_limit_multiplier, + utils::updateable_value kill_limit_multiplier); /// Create a semaphore with practically unlimited count and memory. /// @@ -258,8 +336,10 @@ public: reader_concurrency_semaphore(for_tests, sstring name, int count = std::numeric_limits::max(), ssize_t memory = std::numeric_limits::max(), - size_t max_queue_length = std::numeric_limits::max()) - : reader_concurrency_semaphore(count, memory, std::move(name), max_queue_length) + size_t max_queue_length = std::numeric_limits::max(), + utils::updateable_value serialize_limit_multipler = utils::updateable_value(std::numeric_limits::max()), + utils::updateable_value kill_limit_multipler = utils::updateable_value(std::numeric_limits::max())) + : reader_concurrency_semaphore(count, memory, std::move(name), max_queue_length, std::move(serialize_limit_multipler), std::move(kill_limit_multipler)) {} virtual ~reader_concurrency_semaphore(); @@ -427,12 +507,6 @@ public: return _initial_resources - _resources; } - void consume(resources r) { - _resources -= r; - } - - void signal(const resources& r) noexcept; - size_t waiters() const { return _wait_list.size(); } @@ -452,4 +526,6 @@ public: uint64_t active_reads() const noexcept { return _stats.current_permits - _stats.inactive_reads - waiters(); } + + void foreach_permit(noncopyable_function func); }; diff --git a/reader_permit.hh b/reader_permit.hh index 4303cc328b..11ff44100a 100644 --- a/reader_permit.hh +++ b/reader_permit.hh @@ -79,7 +79,8 @@ public: class blocked_guard; enum class state { - waiting, // waiting for admission + waiting_for_admission, + waiting_for_memory, active_unused, active_used, active_blocked, @@ -100,8 +101,12 @@ private: explicit reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout); - void on_waiting(); + void on_waiting_for_admission(); + void on_waiting_for_memory(future<> f); void on_admission(); + void on_granted_memory(); + + future<> get_memory_future(); void mark_used() noexcept; @@ -130,6 +135,8 @@ public: reader_concurrency_semaphore& semaphore(); + state get_state() const; + bool needs_readmission() const; // Call only when needs_readmission() = true. @@ -143,6 +150,8 @@ public: resource_units consume_resources(reader_resources res); + future request_memory(size_t memory); + reader_resources consumed_resources() const; reader_resources base_resources() const; @@ -160,6 +169,8 @@ public: void on_start_sstable_read() noexcept; void on_finish_sstable_read() noexcept; + + uintptr_t id() { return reinterpret_cast(_impl.get()); } }; using reader_permit_opt = optimized_optional; @@ -171,7 +182,9 @@ class reader_permit::resource_units { friend class reader_permit; friend class reader_concurrency_semaphore; private: - resource_units(reader_permit permit, reader_resources res) noexcept; + class already_consumed_tag {}; + resource_units(reader_permit permit, reader_resources res, already_consumed_tag); + resource_units(reader_permit permit, reader_resources res); public: resource_units(const resource_units&) = delete; resource_units(resource_units&&) noexcept; @@ -184,6 +197,8 @@ public: reader_resources resources() const { return _resources; } }; +std::ostream& operator<<(std::ostream& os, reader_permit::state s); + /// Mark a permit as used. /// /// Conceptually, a permit is considered used, when at least one reader @@ -234,9 +249,13 @@ public: }; template -temporary_buffer make_tracked_temporary_buffer(temporary_buffer buf, reader_permit& permit) { - return temporary_buffer(buf.get_write(), buf.size(), - make_deleter(buf.release(), [units = permit.consume_memory(buf.size())] () mutable { units.reset(); })); +temporary_buffer make_tracked_temporary_buffer(temporary_buffer buf, reader_permit::resource_units units) { + return temporary_buffer(buf.get_write(), buf.size(), make_object_deleter(buf.release(), std::move(units))); +} + +inline temporary_buffer make_new_tracked_temporary_buffer(size_t size, reader_permit& permit) { + auto buf = temporary_buffer(size); + return temporary_buffer(buf.get_write(), buf.size(), make_object_deleter(buf.release(), permit.consume_memory(size))); } file make_tracked_file(file f, reader_permit p); diff --git a/replica/database.cc b/replica/database.cc index 71b83e0564..00dda836bd 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -328,14 +328,18 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat , _read_concurrency_sem(max_count_concurrent_reads, max_memory_concurrent_reads(), "_read_concurrency_sem", - max_inactive_queue_length()) + max_inactive_queue_length(), + _cfg.reader_concurrency_semaphore_serialize_limit_multiplier, + _cfg.reader_concurrency_semaphore_kill_limit_multiplier) // No timeouts or queue length limits - a failure here can kill an entire repair. // Trust the caller to limit concurrency. , _streaming_concurrency_sem( max_count_streaming_concurrent_reads, max_memory_streaming_concurrent_reads(), "_streaming_concurrency_sem", - std::numeric_limits::max()) + std::numeric_limits::max(), + utils::updateable_value(std::numeric_limits::max()), + utils::updateable_value(std::numeric_limits::max())) // No limits, just for accounting. , _compaction_concurrency_sem(reader_concurrency_semaphore::no_limits{}, "compaction") , _system_read_concurrency_sem( @@ -343,7 +347,9 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat max_count_concurrent_reads, max_memory_system_concurrent_reads(), "_system_read_concurrency_sem", - std::numeric_limits::max()) + std::numeric_limits::max(), + utils::updateable_value(std::numeric_limits::max()), + utils::updateable_value(std::numeric_limits::max())) , _row_cache_tracker(cache_tracker::register_metrics::yes) , _apply_stage("db_apply", &database::do_apply) , _version(empty_version) diff --git a/scylla-gdb.py b/scylla-gdb.py index 52a39acf6f..0c22070d3d 100755 --- a/scylla-gdb.py +++ b/scylla-gdb.py @@ -2058,37 +2058,31 @@ class scylla_memory(gdb.Command): gdb.write('\n') + @staticmethod + def format_semaphore_stats(semaphore): + semaphore_name = "{} sstable reads:".format(str(semaphore['_name'])[1:-1].split("_")[1]) + initial_count = int(semaphore["_initial_resources"]["count"]) + initial_memory = int(semaphore["_initial_resources"]["memory"]) + used_count = initial_count - int(semaphore["_resources"]["count"]) + used_memory = initial_memory - int(semaphore["_resources"]["memory"]) + try: + waiters = int(semaphore["_wait_list"]["_admission_queue"]["_size"]) + except gdb.error: # 5.1 compatibility + waiters = int(semaphore["_wait_list"]["_size"]) + return f'{semaphore_name:<24} {used_count:>3}/{initial_count:>3}, {used_memory:>13}/{initial_memory:>13}, queued: {waiters}' + @staticmethod def print_replica_stats(): db = find_db() if not db: return - try: - mem_stats = dict() - for key, sem in [('user_mem_str', db['_read_concurrency_sem']), ('streaming_mem_str', db['_streaming_concurrency_sem']), ('system_mem_str', db['_system_read_concurrency_sem'])]: - mem_stats[key] = '{:>13}/{:>13} B'.format(int(sem['_initial_resources']['memory'] - sem['_resources']['memory']), int(sem['_initial_resources']['memory'])) - except gdb.error: # <= 4.2 compatibility - for key, sem in [('user_mem_str', db['_read_concurrency_sem']), ('streaming_mem_str', db['_streaming_concurrency_sem']), ('system_mem_str', db['_system_read_concurrency_sem'])]: - mem_stats[key] = 'remaining mem: {:>13} B'.format(int(sem['_resources']['memory'])) - database_typename = lookup_type(['replica::database', 'database'])[1].name gdb.write('Replica:\n') - gdb.write(' Read Concurrency Semaphores:\n' - ' user sstable reads: {user_sst_rd_count:>3}/{user_sst_rd_max_count:>3}, {user_mem_str}, queued: {user_sst_rd_queued}\n' - ' streaming sstable reads: {streaming_sst_rd_count:>3}/{streaming_sst_rd_max_count:>3}, {streaming_mem_str}, queued: {streaming_sst_rd_queued}\n' - ' system sstable reads: {system_sst_rd_count:>3}/{system_sst_rd_max_count:>3}, {system_mem_str}, queued: {system_sst_rd_queued}\n' - .format( - user_sst_rd_count=int(gdb.parse_and_eval(f'{database_typename}::max_count_concurrent_reads')) - int(db['_read_concurrency_sem']['_resources']['count']), - user_sst_rd_max_count=int(gdb.parse_and_eval(f'{database_typename}::max_count_concurrent_reads')), - user_sst_rd_queued=int(db['_read_concurrency_sem']['_wait_list']['_size']), - streaming_sst_rd_count=int(gdb.parse_and_eval(f'{database_typename}::max_count_streaming_concurrent_reads')) - int(db['_streaming_concurrency_sem']['_resources']['count']), - streaming_sst_rd_max_count=int(gdb.parse_and_eval(f'{database_typename}::max_count_streaming_concurrent_reads')), - streaming_sst_rd_queued=int(db['_streaming_concurrency_sem']['_wait_list']['_size']), - system_sst_rd_count=int(gdb.parse_and_eval(f'{database_typename}::max_count_system_concurrent_reads')) - int(db['_system_read_concurrency_sem']['_resources']['count']), - system_sst_rd_max_count=int(gdb.parse_and_eval(f'{database_typename}::max_count_system_concurrent_reads')), - system_sst_rd_queued=int(db['_system_read_concurrency_sem']['_wait_list']['_size']), - **mem_stats)) + gdb.write(' Read Concurrency Semaphores:\n {}\n {}\n {}\n'.format( + scylla_memory.format_semaphore_stats(db['_read_concurrency_sem']), + scylla_memory.format_semaphore_stats(db['_streaming_concurrency_sem']), + scylla_memory.format_semaphore_stats(db['_system_read_concurrency_sem']))) gdb.write(' Execution Stages:\n') for es_path in [('_apply_stage',)]: @@ -4285,7 +4279,7 @@ class scylla_generate_object_graph(gdb.Command): at. The generated graph is an image, which allows the visual inspection of the object graph. - The graph is generated with the help of `graphwiz`. The command + The graph is generated with the help of `graphviz`. The command generates `.dot` files which can be converted to images with the help of the `dot` utility. The command can do this if the output file is one of the supported image formats (e.g. `png`), otherwise only the `.dot` file @@ -4398,7 +4392,7 @@ class scylla_generate_object_graph(gdb.Command): help="Output file. Supported extensions are: dot, png, jpg, jpeg, svg and pdf." " Regardless of the extension, a `.dot` file will always be generated." " If the output is one of the graphic formats the command will convert the `.dot` file using the `dot` utility." - " In this case the dot utility from the graphwiz suite has to be installed on the machine." + " In this case the dot utility from the graphviz suite has to be installed on the machine." " To manually convert the `.dot` file do: `dot -Tpng graph.dot -o graph.png`.") parser.add_argument("-d", "--max-depth", action="store", type=int, default=5, help="Maximum depth to traverse the object graph. Set to -1 for unlimited depth. Default is 5.") @@ -5154,11 +5148,17 @@ class scylla_read_stats(gdb.Command): else: inactive_read_count = len(intrusive_list(semaphore['_inactive_reads'])) + + try: + waiters = int(semaphore["_wait_list"]["_admission_queue"]["_size"]) + except gdb.error: # 5.1 compatibility + waiters = int(semaphore["_wait_list"]["_size"]) + gdb.write("Semaphore {} with: {}/{} count and {}/{} memory resources, queued: {}, inactive={}\n".format( semaphore_name, initial_count - int(semaphore['_resources']['count']), initial_count, initial_memory - int(semaphore['_resources']['memory']), initial_memory, - int(semaphore['_wait_list']['_size']), inactive_read_count)) + waiters, inactive_read_count)) gdb.write("{:>10} {:5} {:>12} {}\n".format('permits', 'count', 'memory', 'table/description/state')) diff --git a/sstables/compress.cc b/sstables/compress.cc index 0dc886e3a8..3711412272 100644 --- a/sstables/compress.cc +++ b/sstables/compress.cc @@ -388,8 +388,9 @@ public: if (_pos != _beg_pos && addr.offset != 0) { throw std::runtime_error("compressed reader out of sync"); } - return _input_stream->read_exactly(addr.chunk_len). - then([this, addr](temporary_buffer buf) { + return _input_stream->read_exactly(addr.chunk_len).then([this, addr](temporary_buffer buf) { + return _permit.request_memory(_compression_metadata->uncompressed_chunk_length()).then( + [this, addr, buf = std::move(buf)] (reader_permit::resource_units res_units) mutable { // The last 4 bytes of the chunk are the adler32/crc32 checksum // of the rest of the (compressed) chunk. auto compressed_len = addr.chunk_len - 4; @@ -415,7 +416,8 @@ public: _pos += out.size(); _underlying_pos += addr.chunk_len; - return make_tracked_temporary_buffer(std::move(out), _permit); + return make_tracked_temporary_buffer(std::move(out), std::move(res_units)); + }); }); } diff --git a/sstables/consumer.hh b/sstables/consumer.hh index d83da71c99..c90b60d11b 100644 --- a/sstables/consumer.hh +++ b/sstables/consumer.hh @@ -134,7 +134,7 @@ private: return read_status::ready; } else { _read_bytes.clear(); - _read_bytes.push_back(make_tracked_temporary_buffer(temporary_buffer(len), _permit)); + _read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit)); std::copy(data.begin(), data.end(), _read_bytes.front().get_write()); _read_bytes_len = len; _pos = data.size(); @@ -208,7 +208,7 @@ public: } else { // copy what we have so far, read the rest later _read_bytes.clear(); - _read_bytes.push_back(make_tracked_temporary_buffer(temporary_buffer(len), _permit)); + _read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit)); std::copy(data.begin(), data.end(),_read_bytes.front().get_write()); _read_bytes_len = len; _read_bytes_where_contiguous = &where; @@ -273,7 +273,7 @@ public: return read_bytes_contiguous(data, static_cast(_u64), where); } else { _read_bytes.clear(); - _read_bytes.push_back(make_tracked_temporary_buffer(temporary_buffer(len), _permit)); + _read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit)); std::copy(data.begin(), data.end(),_read_bytes.front().get_write()); _read_bytes_len = len; _pos = data.size(); @@ -298,7 +298,7 @@ public: return read_bytes(data, static_cast(_u64), where); } else { _read_bytes.clear(); - _read_bytes.push_back(make_tracked_temporary_buffer(temporary_buffer(len), _permit)); + _read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit)); std::copy(data.begin(), data.end(),_read_bytes.front().get_write()); _read_bytes_len = len; _pos = data.size(); diff --git a/sstables/sstables_manager.cc b/sstables/sstables_manager.cc index 86055ebe6b..985906db0d 100644 --- a/sstables/sstables_manager.cc +++ b/sstables/sstables_manager.cc @@ -25,7 +25,9 @@ sstables_manager::sstables_manager( max_count_sstable_metadata_concurrent_reads, max_memory_sstable_metadata_concurrent_reads(available_memory), "sstable_metadata_concurrency_sem", - std::numeric_limits::max()) + std::numeric_limits::max(), + utils::updateable_value(std::numeric_limits::max()), + utils::updateable_value(std::numeric_limits::max())) , _dir_semaphore(dir_sem) { } diff --git a/test/boost/querier_cache_test.cc b/test/boost/querier_cache_test.cc index b1c9156a06..49c7ba0c9e 100644 --- a/test/boost/querier_cache_test.cc +++ b/test/boost/querier_cache_test.cc @@ -674,11 +674,8 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) { BOOST_CHECK_EQUAL(db.get_querier_cache_stats().resource_based_evictions, 0); // Drain all resources of the semaphore - const auto available_resources = semaphore.available_resources(); - semaphore.consume(available_resources); - auto release_resources = defer([&semaphore, available_resources] { - semaphore.signal(available_resources); - }); + auto sponge_permit = semaphore.make_tracking_only_permit(s.get(), "sponge", db::no_timeout); + auto consumed_resources = sponge_permit.consume_resources(semaphore.available_resources()); auto cmd2 = query::read_command(s->id(), s->version(), diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index ceb44d2def..bb0000fdca 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -9,18 +9,25 @@ #include #include #include "reader_concurrency_semaphore.hh" +#include "sstables/sstables_manager.hh" #include "test/lib/log.hh" #include "test/lib/simple_schema.hh" +#include "test/lib/cql_assertions.hh" +#include "test/lib/cql_test_env.hh" #include "test/lib/eventually.hh" #include "test/lib/random_utils.hh" #include "test/lib/random_schema.hh" +#include "test/lib/tmpdir.hh" #include +#include #include #include #include #include "readers/empty_v2.hh" +#include "readers/from_mutations_v2.hh" #include "replica/database.hh" // new_reader_base_cost is there :( +#include "db/config.hh" SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads) { simple_schema s; @@ -136,8 +143,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources - permit->consumed_resources()); if (i % 2) { - const auto consumed_resources = semaphore.available_resources(); - semaphore.consume(consumed_resources); + auto sponge_permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto consumed_resources = sponge_permit.consume_resources(semaphore.available_resources()); auto fut = make_ready_future<>(); if (permit->needs_readmission()) { @@ -145,7 +152,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves } BOOST_REQUIRE(!fut.available()); - semaphore.signal(consumed_resources); + consumed_resources.reset(); fut.get(); } else { if (permit->needs_readmission()) { @@ -386,7 +393,7 @@ class dummy_file_impl : public file_impl { } virtual future> dma_read_bulk(uint64_t offset, size_t range_size, const io_priority_class& pc) override { - temporary_buffer buf(1024); + temporary_buffer buf(range_size); memset(buf.get_write(), 0xff, buf.size()); @@ -405,23 +412,23 @@ SEASTAR_TEST_CASE(reader_restriction_file_tracking) { BOOST_REQUIRE_EQUAL(4 * 1024, semaphore.available_resources().memory); - auto buf1 = tracked_file.dma_read_bulk(0, 0).get0(); + auto buf1 = tracked_file.dma_read_bulk(0, 1024).get0(); BOOST_REQUIRE_EQUAL(3 * 1024, semaphore.available_resources().memory); - auto buf2 = tracked_file.dma_read_bulk(0, 0).get0(); + auto buf2 = tracked_file.dma_read_bulk(0, 1024).get0(); BOOST_REQUIRE_EQUAL(2 * 1024, semaphore.available_resources().memory); - auto buf3 = tracked_file.dma_read_bulk(0, 0).get0(); + auto buf3 = tracked_file.dma_read_bulk(0, 1024).get0(); BOOST_REQUIRE_EQUAL(1 * 1024, semaphore.available_resources().memory); - auto buf4 = tracked_file.dma_read_bulk(0, 0).get0(); + auto buf4 = tracked_file.dma_read_bulk(0, 1024).get0(); BOOST_REQUIRE_EQUAL(0 * 1024, semaphore.available_resources().memory); - auto buf5 = tracked_file.dma_read_bulk(0, 0).get0(); + auto buf5 = tracked_file.dma_read_bulk(0, 1024).get0(); BOOST_REQUIRE_EQUAL(-1 * 1024, semaphore.available_resources().memory); // Reassing buf1, should still have the same amount of units. - buf1 = tracked_file.dma_read_bulk(0, 0).get0(); + buf1 = tracked_file.dma_read_bulk(0, 1024).get0(); BOOST_REQUIRE_EQUAL(-1 * 1024, semaphore.available_resources().memory); // Move buf1 to the heap, so that we can safely destroy it @@ -619,7 +626,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { const auto stats_after = semaphore.get_stats(); BOOST_REQUIRE_EQUAL(stats_after.reads_admitted, stats_before.reads_admitted + uint64_t(can_admit)); - // Deliberately not checking `reads_enqueued`, a read can be enqueued temporarily during the admission process. + // Deliberately not checking `reads_enqueued_for_admission`, a read can be enqueued temporarily during the admission process. if (can_admit == expected_can_admit) { testlog.trace("admission scenario '{}' with expected_can_admit={} passed at {}:{}", description, expected_can_admit, sl.file_name(), @@ -644,7 +651,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { const auto stats_after = semaphore.get_stats(); BOOST_REQUIRE(!enqueued_permit_fut.available()); - BOOST_REQUIRE_EQUAL(stats_after.reads_enqueued, stats_before.reads_enqueued + 1); + BOOST_REQUIRE_EQUAL(stats_after.reads_enqueued_for_admission, stats_before.reads_enqueued_for_admission + 1); BOOST_REQUIRE_EQUAL(stats_after.reads_admitted, stats_before.reads_admitted); BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1); } @@ -682,10 +689,9 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // forward progress -- resources { - const auto resources = reader_resources::with_memory(semaphore.available_resources().memory); - semaphore.consume(resources); + auto sponge_permit = semaphore.make_tracking_only_permit(nullptr, "sponge", db::no_timeout); + sponge_permit.consume_resources(reader_resources::with_memory(semaphore.available_resources().memory)); require_can_admit(true, "semaphore with no memory but all count available"); - semaphore.signal(resources); } BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources); require_can_admit(true, "semaphore in initial state"); @@ -711,7 +717,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { const auto stats_after = semaphore.get_stats(); BOOST_REQUIRE_EQUAL(stats_after.reads_admitted, stats_before.reads_admitted + 1); - BOOST_REQUIRE_EQUAL(stats_after.reads_enqueued, stats_before.reads_enqueued); + BOOST_REQUIRE_EQUAL(stats_after.reads_enqueued_for_admission, stats_before.reads_enqueued_for_admission); } BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources); require_can_admit(true, "semaphore in initial state"); @@ -768,7 +774,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { const auto stats_after = semaphore.get_stats(); BOOST_REQUIRE_EQUAL(stats_after.reads_admitted, stats_before.reads_admitted); - BOOST_REQUIRE_EQUAL(stats_after.reads_enqueued, stats_before.reads_enqueued + 1); + BOOST_REQUIRE_EQUAL(stats_after.reads_enqueued_for_admission, stats_before.reads_enqueued_for_admission + 1); BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1); auto cookie2 = post_enqueue_hook(cookie1); @@ -1007,7 +1013,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_ // * no more count resources left auto p3_fut = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout); BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1); - BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued, 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_admission, 1); BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, 1); BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, 0); BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1); @@ -1068,7 +1074,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_set_resources) { BOOST_REQUIRE_EQUAL(semaphore.initial_resources(), reader_resources(1, 3 * 1024)); auto permit3_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout); - BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued, 1); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_admission, 1); BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1); semaphore.set_resources({4, 4 * 1024}); @@ -1077,3 +1083,492 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_set_resources) { BOOST_REQUIRE_EQUAL(semaphore.initial_resources(), reader_resources(4, 4 * 1024)); permit3_fut.get(); } + +namespace { + +class allocating_reader { + static constexpr size_t admission_cost = 1024; + static constexpr size_t buf_size = 1024; + static constexpr size_t read_iterations = 4; +public: + enum class state { + wait_for_admission, + request_memory, + wait_for_memory, + release_memory, + done, + }; + const char* to_string(state s) { + switch (s) { + case state::wait_for_admission: return "state::wait_for_admission"; + case state::request_memory: return "state::request_memory"; + case state::wait_for_memory: return "state::wait_for_memory"; + case state::release_memory: return "state::release_memory"; + case state::done: return "state::done"; + } + }; +private: + reader_concurrency_semaphore& _sem; + state _state = state::wait_for_admission; + std::optional> _admission_fut; + std::optional _permit; + std::list _current_resource_units; + std::list> _pending_resource_units; + unsigned _read_count = 0; + bool _success = true; +public: + explicit allocating_reader(reader_concurrency_semaphore& sem) : _sem(sem) { + testlog.debug("[{}] allocating_reader created", fmt::ptr(this)); + _admission_fut = sem.obtain_permit(nullptr, "reader", admission_cost, db::no_timeout).then_wrapped([this] (future&& permit_fut) { + try { + _permit = std::move(permit_fut.get()); + _state = state::request_memory; + } catch (...) { + _state = state::done; + _success = false; + } + }); + } + ~allocating_reader() { } + void operator()() { + testlog.debug("[{}|p:0x{:x}] allocating_reader(): _state={}, _permit.state={}, _permit.resources={}, _sem.resources={}", + fmt::ptr(this), + _permit ? _permit->id() : 0, + to_string(_state), + _permit ? format("{}", _permit->get_state()) : "N/A", + _permit ? _permit->consumed_resources() : reader_resources{}, + _sem.consumed_resources()); + switch (_state) { + case state::wait_for_admission: + break; + case state::request_memory: + { + size_t n = 0; + if (!_read_count) { + n = 1; + } else { + n = tests::random::get_int(1, 8); + } + ++_read_count; + try { + for (size_t i = 0; i < n; ++i) { + _pending_resource_units.emplace_back(_permit->request_memory(buf_size)); + } + } catch (std::bad_alloc&) { + testlog.debug("[{}|p:{}] read killed", fmt::ptr(this), _permit ? _permit->id() : 0); + _read_count = read_iterations; + } + _state = state::wait_for_memory; + break; + } + case state::wait_for_memory: + for (auto it = _pending_resource_units.begin(); it != _pending_resource_units.end();) { + if (it->available()) { + try { + _current_resource_units.push_back(it->get()); + } catch (std::bad_alloc&) { + testlog.debug("[{}|p:{}] read killed", fmt::ptr(this), _permit ? _permit->id() : 0); + _read_count = read_iterations; + } + it = _pending_resource_units.erase(it); + } else { + ++it; + } + } + if (_pending_resource_units.empty()) { + _state = state::release_memory; + } + break; + case state::release_memory: + if (_current_resource_units.empty()) { + if (_read_count == read_iterations) { + _state = state::done; + } else if (!tests::random::get_int(0, 7)) { + _state = state::done; + } else { + _state = state::request_memory; + } + } else { + _current_resource_units.pop_front(); + } + break; + case state::done: + _permit.reset(); + break; + } + } + bool done() const { return _state == state::done; } + bool success() const { return _success; } + reader_resources resources() const { return _permit ? _permit->consumed_resources() : reader_resources{}; } + future<> close() { + if (_admission_fut) { + co_await std::move(_admission_fut).value(); + } + co_await coroutine::parallel_for_each(_pending_resource_units.begin(), _pending_resource_units.end(), [this] (future& fut) { + return std::move(fut).then_wrapped([] (future&& fut) { + try { + fut.get(); + } catch (...) { + } + }); + }); + _current_resource_units.clear(); + _permit.reset(); + } +}; + + +void dump_stats(const reader_concurrency_semaphore& semaphore, log_level lvl, bool with_diag = false) { + const auto& stats = semaphore.get_stats(); + testlog.log(lvl, "stats = {{\n" + "\t.permit_based_evictions = {}\n" + "\t.time_based_evictions = {}\n" + "\t.inactive_reads = {}\n" + "\t.total_successful_reads = {}\n" + "\t.total_failed_reads = {}\n" + "\t.total_reads_shed_due_to_overload = {}\n" + "\t.total_reads_killed_due_to_kill_limit = {}\n" + "\t.reads_admitted = {}\n" + "\t.reads_enqueued_for_admission = {}\n" + "\t.reads_enqueued_for_memory = {}\n" + "\t.total_permits = {}\n" + "\t.current_permits = {}\n" + "\t.used_permits = {}\n" + "\t.blocked_permits = {}\n" + "}}{}", + stats.permit_based_evictions, + stats.time_based_evictions, + stats.inactive_reads, + stats.total_successful_reads, + stats.total_failed_reads, + stats.total_reads_shed_due_to_overload, + stats.total_reads_killed_due_to_kill_limit, + stats.reads_admitted, + stats.reads_enqueued_for_admission, + stats.reads_enqueued_for_memory, + stats.total_permits, + stats.current_permits, + stats.used_permits, + stats.blocked_permits, + with_diag ? format("\n{}", semaphore.dump_diagnostics()) : ""); +}; + +} //anonymous namespace + +// Check that the memory consumption limiting mechanism doesn't leak any +// resources or cause any internal consistencies in the semaphore. +SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_memory_limit_no_leaks) { + const auto initial_resources = reader_concurrency_semaphore::resources{4, 4 * 1024}; + const auto serialize_multiplier = 2; + const auto kill_multiplier = 3; + reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100, + utils::updateable_value(serialize_multiplier), utils::updateable_value(kill_multiplier)); + auto stop_sem = deferred_stop(semaphore); + + const size_t reader_count_target = 6; + const size_t iteration_limit = 100; + + std::list readers; + + size_t i = 0; + bool done = false; + sstring error = ""; + while (!done) { + testlog.debug("iteration {}", i); + + for (auto& rd : readers) { + rd(); + reader_resources all_permit_res; + semaphore.foreach_permit([&all_permit_res] (const reader_permit& p) { all_permit_res += p.consumed_resources(); }); + if (semaphore.consumed_resources() != all_permit_res) { + testlog.error("resource mismatch: semaphore.consumed_resources() ({}) != sum of resources in permits ({})", semaphore.consumed_resources(), all_permit_res); + } + } + + if (readers.size() < reader_count_target) { + readers.emplace_back(semaphore); + } + done = std::all_of(readers.begin(), readers.end(), std::mem_fn(&allocating_reader::done)); + + dump_stats(semaphore, log_level::debug, true); + + reader_resources all_permit_res; + semaphore.foreach_permit([&all_permit_res] (const reader_permit& p) { all_permit_res += p.consumed_resources(); }); + + if (semaphore.consumed_resources().memory >= (semaphore.initial_resources().memory * kill_multiplier)) { + error = format("kill limit failed: semaphore.consumed_resources() ({}) >= kill limit ({})", semaphore.consumed_resources().memory, (semaphore.initial_resources().memory * kill_multiplier)); + } else if (semaphore.consumed_resources() != all_permit_res) { + error = format("resource mismatch: semaphore.consumed_resources() ({}) != sum of resources in permits ({})", semaphore.consumed_resources(), all_permit_res); + } else if (i >= iteration_limit) { + error = format("test failed to finish in {} iterations", iteration_limit); + } + + if (error.empty()) { + ++i; + } else { + testlog.error("stopping test at iteration {}: {}", i, error); + done = true; + } + + seastar::thread::yield(); + } + dump_stats(semaphore, log_level::info, false); + parallel_for_each(readers.begin(), readers.end(), [] (allocating_reader& rd) { + return rd.close(); + }).get(); + if (!error.empty()) { + BOOST_FAIL(error); + } + const bool all_ok = std::all_of(readers.begin(), readers.end(), std::mem_fn(&allocating_reader::success)); + BOOST_REQUIRE(all_ok); +} + +struct memory_limit_table { + schema_ptr schema; + tmpdir sst_dir; + int32_t pk; + int32_t ck; + sstring value; +}; +memory_limit_table create_memory_limit_table(cql_test_env& env, uint64_t target_num_sstables) { + auto& db = env.local_db(); + + sstring value(256 * 1024, '0'); + + env.execute_cql("CREATE TABLE ks.tbl (pk int, ck int, value text, primary key (pk, ck)) WITH compaction = {'class': 'NullCompactionStrategy'};").get(); + + env.require_table_exists("ks", "tbl").get(); + + auto& tbl = db.find_column_family("ks", "tbl"); + auto s = tbl.schema(); + auto& sst_man = tbl.get_sstables_manager(); + auto& semaphore = db.get_reader_concurrency_semaphore(); + + int32_t pk = 0; + dht::decorated_key dk(dht::token(), partition_key::make_empty()); + do { + dk = dht::decorate_key(*s, partition_key::from_single_value(*s, serialized(++pk))); + } while (dht::shard_of(*s, dk.token()) != this_shard_id()); + + const int32_t ck = 0; + + mutation mut(s, std::move(dk)); + mut.set_clustered_cell(clustering_key::from_single_value(*s, serialized(ck)), to_bytes("value"), data_value(value), 0); + + auto sstables_dir = tmpdir(); + + const auto sstable_write_concurrency = 16; + + auto num_sstables = 0; + parallel_for_each(boost::irange(0, sstable_write_concurrency), [&] (int i) { + return seastar::async([&, i] { + while (num_sstables != target_num_sstables) { + ++num_sstables; + auto sst = sst_man.make_sstable(s, sstables_dir.path().string(), sstables::generation_type{num_sstables}, sst_man.get_highest_supported_format(), sstables::sstable_format_types::big); + auto writer_cfg = sst_man.configure_writer("test"); + sst->write_components( + make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_tracking_only_permit(s.get(), "test", db::no_timeout), mut, s->full_slice()), + 1, + s, + writer_cfg, + encoding_stats{}).get(); + sst->load().get(); + tbl.add_sstable_and_update_cache(std::move(sst)).get(); + } + }); + }).get(); + + return {s, std::move(sstables_dir), pk, ck, std::move(value)}; +} + +constexpr uint64_t target_memory = uint64_t(1) << 28; // 256MB + +// Check that the memory consumption limiting mechanism of the semaphore does +// prevent OOM crashes. +// The test fails by OOM crashing. +// This test should be run with 256MB of memory. +SEASTAR_TEST_CASE(test_reader_concurrency_semaphore_memory_limit_no_oom) { + if (memory::stats().total_memory() != target_memory) { + std::cerr << "Test " << get_name() << " should be run with 256M of memory, make sure you invoke with -m256M" << std::endl; + return make_ready_future<>(); + } + + auto db_cfg_ptr = make_shared(); + auto& db_cfg = *db_cfg_ptr; + + // Disable the cache altogether, we want all reads to go to disk. + db_cfg.enable_cache(false); + db_cfg.enable_commitlog(false); + db_cfg.reader_concurrency_semaphore_serialize_limit_multiplier.set(2, utils::config_file::config_source::CommandLine); + db_cfg.reader_concurrency_semaphore_kill_limit_multiplier.set(4, utils::config_file::config_source::CommandLine); + + return do_with_cql_env_thread([] (cql_test_env& env) { + auto tbl = create_memory_limit_table(env, 256); + + auto& db = env.local_db(); + auto& semaphore = db.get_reader_concurrency_semaphore(); + + const auto num_reads = 128; + + auto read_id = env.prepare("SELECT value FROM ks.tbl WHERE pk = ? AND ck = ?").get0(); + + parallel_for_each(boost::irange(0, num_reads), [&] (int i) { + return env.execute_prepared(read_id, {cql3::raw_value::make_value(serialized(tbl.pk)), cql3::raw_value::make_value(serialized(tbl.ck))}).then_wrapped( + [&] (future> fut) { + if (fut.failed()) { + // We expect failed, OOM-killed reads here. + // No way to verify why they failed so we swallow all failures. + fut.ignore_ready_future(); + return; + } + assert_that(fut.get()).is_rows().with_rows_ignore_order({ {serialized(tbl.value)} }); + }); + }).get(); + return make_ready_future<>(); + }, std::move(db_cfg_ptr)); +} + +// Check that the memory consumption limiting mechanism of the semaphore does +// prevent reads exhausting memory to the extent that they start to fail due to +// bad alloc (but not necessarily crash the node). +// Instead the limiting mechanism engages and kills reads before they get to that +// point. From the outset, a read failing due to OOM and a read killed by the +// limiting mechanism looks the same. To differentiate, the test checks that all +// failures were caused by the limiting mechanism. +// This test should be run with 256M memory. +SEASTAR_TEST_CASE(test_reader_concurrency_semaphore_memory_limit_engages) { + if (memory::stats().total_memory() != target_memory) { + std::cerr << "Test " << get_name() << " should be run with 256M of memory, make sure you invoke with -m256M" << std::endl; + return make_ready_future<>(); + } + auto db_cfg_ptr = make_shared(); + auto& db_cfg = *db_cfg_ptr; + + // Disable the cache altogether, we want all reads to go to disk. + db_cfg.enable_cache(false); + db_cfg.enable_commitlog(false); + db_cfg.reader_concurrency_semaphore_serialize_limit_multiplier.set(2, utils::config_file::config_source::CommandLine); + db_cfg.reader_concurrency_semaphore_kill_limit_multiplier.set(4, utils::config_file::config_source::CommandLine); + + return do_with_cql_env_thread([] (cql_test_env& env) { + auto tbl = create_memory_limit_table(env, 32); + + auto& db = env.local_db(); + auto& semaphore = db.get_reader_concurrency_semaphore(); + + const auto num_reads = 128; + + auto read_id = env.prepare("SELECT value FROM ks.tbl WHERE pk = ? AND ck = ?").get0(); + + // We first check that the test params are not too strict and a single + // read can finish successfully. + try { + auto msg = env.execute_prepared(read_id, {cql3::raw_value::make_value(serialized(tbl.pk)), cql3::raw_value::make_value(serialized(tbl.ck))}).get(); + assert_that(msg).is_rows().with_rows_ignore_order({ {serialized(tbl.value)} }); + } catch (...) { + BOOST_FAIL(fmt::format("canary read failed with: {}", std::current_exception())); + } + + uint64_t successful_reads = 0; + uint64_t failed_reads = 0; + + parallel_for_each(boost::irange(0, num_reads), [&] (int i) { + return env.execute_prepared(read_id, {cql3::raw_value::make_value(serialized(tbl.pk)), cql3::raw_value::make_value(serialized(tbl.ck))}).then_wrapped( + [&] (future> fut) { + if (fut.failed()) { + // We expect failed, OOM-killed reads here. + // No way to verify why they failed so we swallow all failures. + fut.ignore_ready_future(); + ++failed_reads; + return; + } + assert_that(fut.get()).is_rows().with_rows_ignore_order({ {serialized(tbl.value)} }); + ++successful_reads; + }); + }).get(); + + testlog.info("total reads: {} ({} successful, {} failed)", num_reads, successful_reads, failed_reads); + dump_stats(semaphore, log_level::info, false); + + // There should be both successful and failed reads. + // If there is only one or the other, the test is not testing anything. + // We also check that the memory limiting mechanism of the semaphore was engaged. + // The test is meaningless without it. + // In the slow debug builds we never reach the kill limit for some reason. +#ifndef DEBUG + BOOST_REQUIRE_GE(failed_reads, 1); +#endif + BOOST_REQUIRE_GE(successful_reads, 1); + + // Almost each failed read should have been in the memory queue at one point. + BOOST_REQUIRE_GE(semaphore.get_stats().reads_enqueued_for_memory, semaphore.get_stats().total_reads_killed_due_to_kill_limit); + + // All failures must be caused by the kill limit triggering. + BOOST_REQUIRE_EQUAL(semaphore.get_stats().total_reads_killed_due_to_kill_limit, failed_reads); + + return make_ready_future<>(); + }, std::move(db_cfg_ptr)); +} + +SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_request_memory_preserves_state) { + const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024}; + const auto serialize_multiplier = 2; + const auto kill_multiplier = std::numeric_limits::max(); // we don't want this to interfere with our test + reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100, + utils::updateable_value(serialize_multiplier), utils::updateable_value(kill_multiplier)); + auto stop_sem = deferred_stop(semaphore); + + auto sponge_permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); + + uint64_t reads_enqueued_for_memory = 0; + + auto do_check = [&] (reader_permit& permit, uint64_t used, uint64_t blocked, std::source_location sl) { + testlog.info("do_check() {}:{}", sl.file_name(), sl.line()); + + BOOST_REQUIRE_EQUAL(semaphore.get_stats().current_permits, 2); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, used); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, blocked); + + auto units1 = permit.request_memory(1024).get(); + + BOOST_REQUIRE_EQUAL(semaphore.get_stats().current_permits, 2); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, used); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, blocked); + + auto sponge_units = sponge_permit.request_memory(8 * 1024).get(); + + // sponge permit is now the blessed one + + auto units2_fut = permit.request_memory(1024); + BOOST_REQUIRE(!units2_fut.available()); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_memory, ++reads_enqueued_for_memory); + + sponge_units.reset(); + auto units2 = units2_fut.get(); + + BOOST_REQUIRE_EQUAL(semaphore.get_stats().current_permits, 2); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, used); + BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, blocked); + }; + + // unused + { + auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); + do_check(permit, 0, 0, std::source_location::current()); + } + + // used + { + auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); + reader_permit::used_guard ug{permit}; + do_check(permit, 1, 0, std::source_location::current()); + } + + // blocked + { + auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); + reader_permit::used_guard ug{permit}; + reader_permit::blocked_guard bg{permit}; + do_check(permit, 1, 1, std::source_location::current()); + } +} diff --git a/test/boost/suite.yaml b/test/boost/suite.yaml index 53027ae077..bdb7cdd0b1 100644 --- a/test/boost/suite.yaml +++ b/test/boost/suite.yaml @@ -31,6 +31,6 @@ custom_args: cql_query_test: - '-c2 -m2G --fail-on-abandoned-failed-futures=true' reader_concurrency_semaphore_test: - - '-c1 -m1G' + - '-c1 -m256M' run_in_debug: - logalloc_standard_allocator_segment_pool_backend_test diff --git a/test/boost/view_build_test.cc b/test/boost/view_build_test.cc index 3be4514f7e..0f52619c19 100644 --- a/test/boost/view_build_test.cc +++ b/test/boost/view_build_test.cc @@ -550,11 +550,8 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) { }).get0(); // consume all units except what is needed to admit a single reader. - const auto consumed_resources = sem.initial_resources() - reader_resources{1, replica::new_reader_base_cost}; - sem.consume(consumed_resources); - auto release_resources = defer([&sem, consumed_resources] { - sem.signal(consumed_resources); - }); + auto sponge_permit = sem.make_tracking_only_permit(s.get(), "sponge", db::no_timeout); + auto resources = sponge_permit.consume_resources(sem.available_resources() - reader_resources{1, replica::new_reader_base_cost}); testlog.info("res = [.count={}, .memory={}]", sem.available_resources().count, sem.available_resources().memory); diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 7e79352fa2..bd93b8c53d 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -493,6 +493,12 @@ public: debug::the_database = nullptr; }); auto cfg = cfg_in.db_config; + if (!cfg->reader_concurrency_semaphore_serialize_limit_multiplier.is_set()) { + cfg->reader_concurrency_semaphore_serialize_limit_multiplier.set(std::numeric_limits::max()); + } + if (!cfg->reader_concurrency_semaphore_kill_limit_multiplier.is_set()) { + cfg->reader_concurrency_semaphore_kill_limit_multiplier.set(std::numeric_limits::max()); + } tmpdir data_dir; auto data_dir_path = data_dir.path().string(); if (!cfg->data_file_directories.is_set()) { diff --git a/test/pylib/scylla_cluster.py b/test/pylib/scylla_cluster.py index c970237065..ce21eafe18 100644 --- a/test/pylib/scylla_cluster.py +++ b/test/pylib/scylla_cluster.py @@ -97,6 +97,9 @@ def make_scylla_conf(workdir: pathlib.Path, host_addr: str, seed_addrs: List[str 'permissions_update_interval_in_ms': 100, 'permissions_validity_in_ms': 100, + + 'reader_concurrency_semaphore_serialize_limit_multiplier': 0, + 'reader_concurrency_semaphore_kill_limit_multiplier': 0, } # Seastar options can not be passed through scylla.yaml, use command line