Merge 'reader_concurrency_semaphore: add more layers of defense against OOM' from Botond Dénes
The reader concurrency semaphore has no mechanism to limit the memory consumption of already admitted read. Once memory collective memory consumption of all the admitted reads is above the limit, all it can do is to not admit any more. Sometimes this is not enough and the memory consumption of the already admitted reads balloons to the point of OOMing the node. This pull-request offers a solution to this: it introduces two more layers of defense above this: a soft and a hard limit. Both are multipliers applied on the semaphores normal memory limit. When the soft limit threshold is surpassed, all readers but one are blocked via a new blocking `request_memory()` call which is used by the `tracking_file_impl`. The reader to be allowed to proceed is chosen at random, it is the first reader which happens to request memory after the limit is surpassed. This is both very simple and should avoid situations where the algorithm choosing the reader to be allowed to proceed chooses a reader which will then always time out. When the hard limit threshold is surpassed, `reader_concurrency_semaphore::consume()` starts throwing `std::bad_alloc`. This again will result in eliminating whichever reader was unlucky enough to request memory at the right moment. With this, the semaphore is now effectively enforcing an upper bound for memory consumption, defined by the hard limit. Refs: https://github.com/scylladb/scylladb/issues/11927 Closes #11955 * github.com:scylladb/scylladb: test: reader_concurrency_semaphore_test: add tests for semaphore memory limits reader_permit: expose operator<<(reader_permit::state) reader_permit: add id() accessor reader_concurrency_semaphore: add foreach_permit() reader_concurrency_semaphore: document the new memory limits reader_concurrency_semaphore: add OOM killer reader_concurrency_semaphore: make consume() and signal() private test: stop using reader_concurrency_semaphore::{consume,signal}() directly reader_concurrency_semaphore: move consume() out-of-line reader_permit: consume(): make it exception-safe reader_permit: resource_units::reset(): only call consume() if needed reader_concurrency_semaphore: tracked_file_impl: use request_memory() reader_concurrency_semaphore: add request_memory() reader_concurrency_semaphore: wrap wait list reader_concurrency_semaphore: add {serialize,kill}_limit_multiplier parameters test/boost/reader_concurrency_semaphore_test: dummy_file_impl: don't use hardoced buffer size reader_permit: add make_new_tracked_temporary_buffer() reader_permit: add get_state() accessor reader_permit: resource_units: add constructor for already consumed res reader_permit: resource_units: remove noexcept qualifier from constructor db/config: introduce reader_concurrency_semaphore_{serialize,kill}_limit_multiplier scylla-gdb.py: scylla-memory: extract semaphore stats formatting code scylla-gdb.py: fix spelling of "graphviz"
This commit is contained in:
@@ -843,6 +843,10 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, max_memory_for_unlimited_query_hard_limit(this, "max_memory_for_unlimited_query_hard_limit", "max_memory_for_unlimited_query", liveness::LiveUpdate, value_status::Used, (uint64_t(100) << 20),
|
||||
"Maximum amount of memory a query, whose memory consumption is not naturally limited, is allowed to consume, e.g. non-paged and reverse queries. "
|
||||
"This is the hard limit, queries violating this limit will be aborted.")
|
||||
, reader_concurrency_semaphore_serialize_limit_multiplier(this, "reader_concurrency_semaphore_serialize_limit_multiplier", liveness::LiveUpdate, value_status::Used, 2,
|
||||
"Start serializing reads after their collective memory consumption goes above $normal_limit * $multiplier.")
|
||||
, reader_concurrency_semaphore_kill_limit_multiplier(this, "reader_concurrency_semaphore_kill_limit_multiplier", liveness::LiveUpdate, value_status::Used, 4,
|
||||
"Start killing reads after their collective memory consumption goes above $normal_limit * $multiplier.")
|
||||
, twcs_max_window_count(this, "twcs_max_window_count", liveness::LiveUpdate, value_status::Used, 50,
|
||||
"The maximum number of compaction windows allowed when making use of TimeWindowCompactionStrategy. A setting of 0 effectively disables the restriction.")
|
||||
, initial_sstable_loading_concurrency(this, "initial_sstable_loading_concurrency", value_status::Used, 4u,
|
||||
|
||||
@@ -339,6 +339,8 @@ public:
|
||||
named_value<uint32_t> max_clustering_key_restrictions_per_query;
|
||||
named_value<uint64_t> max_memory_for_unlimited_query_soft_limit;
|
||||
named_value<uint64_t> max_memory_for_unlimited_query_hard_limit;
|
||||
named_value<uint32_t> reader_concurrency_semaphore_serialize_limit_multiplier;
|
||||
named_value<uint32_t> reader_concurrency_semaphore_kill_limit_multiplier;
|
||||
named_value<uint32_t> twcs_max_window_count;
|
||||
named_value<unsigned> initial_sstable_loading_concurrency;
|
||||
named_value<bool> enable_3_1_0_compatibility_mode;
|
||||
|
||||
@@ -26,9 +26,14 @@ std::ostream& operator<<(std::ostream& os , const reader_resources& r) {
|
||||
return os;
|
||||
}
|
||||
|
||||
reader_permit::resource_units::resource_units(reader_permit permit, reader_resources res) noexcept
|
||||
reader_permit::resource_units::resource_units(reader_permit permit, reader_resources res, already_consumed_tag)
|
||||
: _permit(std::move(permit)), _resources(res) {
|
||||
}
|
||||
|
||||
reader_permit::resource_units::resource_units(reader_permit permit, reader_resources res)
|
||||
: _permit(std::move(permit)) {
|
||||
_permit.consume(res);
|
||||
_resources = res;
|
||||
}
|
||||
|
||||
reader_permit::resource_units::resource_units(resource_units&& o) noexcept
|
||||
@@ -58,7 +63,9 @@ void reader_permit::resource_units::add(resource_units&& o) {
|
||||
}
|
||||
|
||||
void reader_permit::resource_units::reset(reader_resources res) {
|
||||
_permit.consume(res);
|
||||
if (res.non_zero()) {
|
||||
_permit.consume(res);
|
||||
}
|
||||
if (_resources.non_zero()) {
|
||||
_permit.signal(_resources);
|
||||
}
|
||||
@@ -83,6 +90,9 @@ class reader_permit::impl
|
||||
db::timeout_clock::time_point _timeout;
|
||||
query::max_result_size _max_result_size{query::result_memory_limiter::unlimited_result_size};
|
||||
uint64_t _sstables_read = 0;
|
||||
size_t _requested_memory = 0;
|
||||
std::optional<shared_future<>> _memory_future;
|
||||
uint64_t _oom_kills = 0;
|
||||
|
||||
private:
|
||||
void on_permit_used() {
|
||||
@@ -105,6 +115,10 @@ private:
|
||||
if (_used_branches) {
|
||||
_state = reader_permit::state::active_used;
|
||||
on_permit_used();
|
||||
if (_blocked_branches) {
|
||||
_state = reader_permit::state::active_blocked;
|
||||
on_permit_blocked();
|
||||
}
|
||||
} else {
|
||||
_state = reader_permit::state::active_unused;
|
||||
}
|
||||
@@ -112,6 +126,9 @@ private:
|
||||
|
||||
void on_permit_inactive(reader_permit::state st) {
|
||||
_state = st;
|
||||
if (_marked_as_blocked) {
|
||||
on_permit_unblocked();
|
||||
}
|
||||
if (_marked_as_used) {
|
||||
on_permit_unused();
|
||||
}
|
||||
@@ -193,8 +210,13 @@ public:
|
||||
return _state;
|
||||
}
|
||||
|
||||
void on_waiting() {
|
||||
on_permit_inactive(reader_permit::state::waiting);
|
||||
void on_waiting_for_admission() {
|
||||
on_permit_inactive(reader_permit::state::waiting_for_admission);
|
||||
}
|
||||
|
||||
void on_waiting_for_memory(future<> fut) {
|
||||
on_permit_inactive(reader_permit::state::waiting_for_memory);
|
||||
_memory_future.emplace(std::move(fut));
|
||||
}
|
||||
|
||||
void on_admission() {
|
||||
@@ -204,6 +226,17 @@ public:
|
||||
_base_resources_consumed = true;
|
||||
}
|
||||
|
||||
void on_granted_memory() {
|
||||
if (_state == reader_permit::state::waiting_for_memory) {
|
||||
on_permit_active();
|
||||
}
|
||||
consume({0, std::exchange(_requested_memory, 0)});
|
||||
}
|
||||
|
||||
future<> get_memory_future() {
|
||||
return _memory_future->get_future();
|
||||
}
|
||||
|
||||
void on_register_as_inactive() {
|
||||
assert(_state == reader_permit::state::active_unused || _state == reader_permit::state::active_used);
|
||||
on_permit_inactive(reader_permit::state::inactive);
|
||||
@@ -224,8 +257,8 @@ public:
|
||||
}
|
||||
|
||||
void consume(reader_resources res) {
|
||||
_semaphore.consume(*this, res);
|
||||
_resources += res;
|
||||
_semaphore.consume(res);
|
||||
}
|
||||
|
||||
void signal(reader_resources res) {
|
||||
@@ -233,6 +266,13 @@ public:
|
||||
_semaphore.signal(res);
|
||||
}
|
||||
|
||||
future<resource_units> request_memory(size_t memory) {
|
||||
_requested_memory += memory;
|
||||
return _semaphore.request_memory(*this, memory).then([this, memory] {
|
||||
return resource_units(reader_permit(shared_from_this()), {0, ssize_t(memory)}, resource_units::already_consumed_tag{});
|
||||
});
|
||||
}
|
||||
|
||||
reader_resources resources() const {
|
||||
return _resources;
|
||||
}
|
||||
@@ -347,6 +387,10 @@ public:
|
||||
--_semaphore._stats.disk_reads;
|
||||
}
|
||||
}
|
||||
|
||||
bool on_oom_kill() noexcept {
|
||||
return !bool(_oom_kills++);
|
||||
}
|
||||
};
|
||||
|
||||
static_assert(std::is_nothrow_copy_constructible_v<reader_permit>);
|
||||
@@ -368,14 +412,26 @@ reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const sche
|
||||
{
|
||||
}
|
||||
|
||||
void reader_permit::on_waiting() {
|
||||
_impl->on_waiting();
|
||||
void reader_permit::on_waiting_for_admission() {
|
||||
_impl->on_waiting_for_admission();
|
||||
}
|
||||
|
||||
void reader_permit::on_waiting_for_memory(future<> fut) {
|
||||
_impl->on_waiting_for_memory(std::move(fut));
|
||||
}
|
||||
|
||||
void reader_permit::on_admission() {
|
||||
_impl->on_admission();
|
||||
}
|
||||
|
||||
void reader_permit::on_granted_memory() {
|
||||
_impl->on_granted_memory();
|
||||
}
|
||||
|
||||
future<> reader_permit::get_memory_future() {
|
||||
return _impl->get_memory_future();
|
||||
}
|
||||
|
||||
reader_permit::~reader_permit() {
|
||||
}
|
||||
|
||||
@@ -383,6 +439,10 @@ reader_concurrency_semaphore& reader_permit::semaphore() {
|
||||
return _impl->semaphore();
|
||||
}
|
||||
|
||||
reader_permit::state reader_permit::get_state() const {
|
||||
return _impl->get_state();
|
||||
}
|
||||
|
||||
bool reader_permit::needs_readmission() const {
|
||||
return _impl->needs_readmission();
|
||||
}
|
||||
@@ -407,6 +467,10 @@ reader_permit::resource_units reader_permit::consume_resources(reader_resources
|
||||
return resource_units(*this, res);
|
||||
}
|
||||
|
||||
future<reader_permit::resource_units> reader_permit::request_memory(size_t memory) {
|
||||
return _impl->request_memory(memory);
|
||||
}
|
||||
|
||||
reader_resources reader_permit::consumed_resources() const {
|
||||
return _impl->resources();
|
||||
}
|
||||
@@ -465,8 +529,11 @@ void reader_permit::on_finish_sstable_read() noexcept {
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, reader_permit::state s) {
|
||||
switch (s) {
|
||||
case reader_permit::state::waiting:
|
||||
os << "waiting";
|
||||
case reader_permit::state::waiting_for_admission:
|
||||
os << "waiting_for_admission";
|
||||
break;
|
||||
case reader_permit::state::waiting_for_memory:
|
||||
os << "waiting_for_memory";
|
||||
break;
|
||||
case reader_permit::state::active_unused:
|
||||
os << "active/unused";
|
||||
@@ -658,18 +725,49 @@ future<> reader_concurrency_semaphore::execution_loop() noexcept {
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t reader_concurrency_semaphore::get_serialize_limit() const {
|
||||
if (!_serialize_limit_multiplier() || _serialize_limit_multiplier() == std::numeric_limits<uint32_t>::max() || is_unlimited()) [[unlikely]] {
|
||||
return std::numeric_limits<uint64_t>::max();
|
||||
}
|
||||
return _initial_resources.memory * _serialize_limit_multiplier();
|
||||
}
|
||||
|
||||
uint64_t reader_concurrency_semaphore::get_kill_limit() const {
|
||||
if (!_kill_limit_multiplier() || _kill_limit_multiplier() == std::numeric_limits<uint32_t>::max() || is_unlimited()) [[unlikely]] {
|
||||
return std::numeric_limits<uint64_t>::max();
|
||||
}
|
||||
return _initial_resources.memory * _kill_limit_multiplier();
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::consume(reader_permit::impl& permit, resources r) {
|
||||
// We check whether we even reached the memory limit first.
|
||||
// This is a cheap check and should be false most of the time, providing a
|
||||
// cheap short-circuit.
|
||||
if (_resources.memory <= 0 && (consumed_resources().memory + r.memory) >= get_kill_limit()) [[unlikely]] {
|
||||
if (permit.on_oom_kill()) {
|
||||
++_stats.total_reads_killed_due_to_kill_limit;
|
||||
}
|
||||
maybe_dump_reader_permit_diagnostics(*this, _permit_list, "kill limit triggered");
|
||||
throw std::bad_alloc();
|
||||
}
|
||||
_resources -= r;
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::signal(const resources& r) noexcept {
|
||||
_resources += r;
|
||||
maybe_admit_waiters();
|
||||
}
|
||||
|
||||
reader_concurrency_semaphore::reader_concurrency_semaphore(int count, ssize_t memory, sstring name, size_t max_queue_length)
|
||||
reader_concurrency_semaphore::reader_concurrency_semaphore(int count, ssize_t memory, sstring name, size_t max_queue_length,
|
||||
utils::updateable_value<uint32_t> serialize_limit_multiplier, utils::updateable_value<uint32_t> kill_limit_multiplier)
|
||||
: _initial_resources(count, memory)
|
||||
, _resources(count, memory)
|
||||
, _wait_list(expiry_handler(*this))
|
||||
, _ready_list(max_queue_length)
|
||||
, _name(std::move(name))
|
||||
, _max_queue_length(max_queue_length)
|
||||
, _serialize_limit_multiplier(std::move(serialize_limit_multiplier))
|
||||
, _kill_limit_multiplier(std::move(kill_limit_multiplier))
|
||||
{ }
|
||||
|
||||
reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring name)
|
||||
@@ -677,7 +775,9 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring na
|
||||
std::numeric_limits<int>::max(),
|
||||
std::numeric_limits<ssize_t>::max(),
|
||||
std::move(name),
|
||||
std::numeric_limits<size_t>::max()) {}
|
||||
std::numeric_limits<size_t>::max(),
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max())) {}
|
||||
|
||||
reader_concurrency_semaphore::~reader_concurrency_semaphore() {
|
||||
if (!_stats.total_permits) {
|
||||
@@ -879,16 +979,23 @@ std::exception_ptr reader_concurrency_semaphore::check_queue_size(std::string_vi
|
||||
return {};
|
||||
}
|
||||
|
||||
future<> reader_concurrency_semaphore::enqueue_waiter(reader_permit permit, read_func func) {
|
||||
future<> reader_concurrency_semaphore::enqueue_waiter(reader_permit permit, read_func func, wait_on wait) {
|
||||
if (auto ex = check_queue_size("wait")) {
|
||||
return make_exception_future<>(std::move(ex));
|
||||
}
|
||||
promise<> pr;
|
||||
auto fut = pr.get_future();
|
||||
permit.on_waiting();
|
||||
auto timeout = permit.timeout();
|
||||
_wait_list.push_back(entry(std::move(pr), std::move(permit), std::move(func)), timeout);
|
||||
++_stats.reads_enqueued;
|
||||
if (wait == wait_on::admission) {
|
||||
permit.on_waiting_for_admission();
|
||||
_wait_list.push_to_admission_queue(entry(std::move(pr), std::move(permit), std::move(func)), timeout);
|
||||
++_stats.reads_enqueued_for_admission;
|
||||
} else {
|
||||
permit.on_waiting_for_memory(std::move(fut));
|
||||
fut = permit.get_memory_future();
|
||||
_wait_list.push_to_memory_queue(entry(std::move(pr), std::move(permit), std::move(func)), timeout);
|
||||
++_stats.reads_enqueued_for_memory;
|
||||
}
|
||||
return fut;
|
||||
}
|
||||
|
||||
@@ -914,6 +1021,26 @@ void reader_concurrency_semaphore::evict_readers_in_background() {
|
||||
|
||||
reader_concurrency_semaphore::can_admit
|
||||
reader_concurrency_semaphore::can_admit_read(const reader_permit& permit) const noexcept {
|
||||
if (_resources.memory < 0) [[unlikely]] {
|
||||
const auto consumed_memory = consumed_resources().memory;
|
||||
if (consumed_memory >= get_kill_limit()) {
|
||||
return can_admit::no;
|
||||
}
|
||||
if (consumed_memory >= get_serialize_limit()) {
|
||||
if (_blessed_permit) {
|
||||
// blessed permit is never in the wait list
|
||||
return can_admit::no;
|
||||
} else {
|
||||
auto res = permit.get_state() == reader_permit::state::waiting_for_memory ? can_admit::yes : can_admit::no;
|
||||
return res;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (permit.get_state() == reader_permit::state::waiting_for_memory) {
|
||||
return can_admit::yes;
|
||||
}
|
||||
|
||||
if (!_ready_list.empty()) {
|
||||
return can_admit::no;
|
||||
}
|
||||
@@ -940,7 +1067,7 @@ future<> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, r
|
||||
|
||||
const auto admit = can_admit_read(permit);
|
||||
if (admit != can_admit::yes || !_wait_list.empty()) {
|
||||
auto fut = enqueue_waiter(std::move(permit), std::move(func));
|
||||
auto fut = enqueue_waiter(std::move(permit), std::move(func), wait_on::admission);
|
||||
if (admit == can_admit::yes && !_wait_list.empty()) {
|
||||
// This is a contradiction: the semaphore could admit waiters yet it has waiters.
|
||||
// Normally, the semaphore should admit waiters as soon as it can.
|
||||
@@ -967,8 +1094,13 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept {
|
||||
while (!_wait_list.empty() && (admit = can_admit_read(_wait_list.front().permit)) == can_admit::yes) {
|
||||
auto& x = _wait_list.front();
|
||||
try {
|
||||
x.permit.on_admission();
|
||||
++_stats.reads_admitted;
|
||||
if (x.permit.get_state() == reader_permit::state::waiting_for_memory) {
|
||||
_blessed_permit = x.permit._impl.get();
|
||||
x.permit.on_granted_memory();
|
||||
} else {
|
||||
x.permit.on_admission();
|
||||
++_stats.reads_admitted;
|
||||
}
|
||||
if (x.func) {
|
||||
_ready_list.push(std::move(x));
|
||||
} else {
|
||||
@@ -985,6 +1117,29 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept {
|
||||
}
|
||||
}
|
||||
|
||||
future<> reader_concurrency_semaphore::request_memory(reader_permit::impl& permit, size_t memory) {
|
||||
// Already blocked on memory?
|
||||
if (permit.get_state() == reader_permit::state::waiting_for_memory) {
|
||||
return permit.get_memory_future();
|
||||
}
|
||||
|
||||
if (_resources.memory > 0 || (consumed_resources().memory + memory) < get_serialize_limit()) {
|
||||
permit.on_granted_memory();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
if (!_blessed_permit) {
|
||||
_blessed_permit = &permit;
|
||||
}
|
||||
|
||||
if (_blessed_permit == &permit) {
|
||||
permit.on_granted_memory();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
return enqueue_waiter(reader_permit(permit.shared_from_this()), {}, wait_on::memory);
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::on_permit_created(reader_permit::impl& permit) {
|
||||
_permit_gate.enter();
|
||||
_permit_list.push_back(permit);
|
||||
@@ -996,6 +1151,10 @@ void reader_concurrency_semaphore::on_permit_destroyed(reader_permit::impl& perm
|
||||
permit.unlink();
|
||||
_permit_gate.leave();
|
||||
--_stats.current_permits;
|
||||
if (_blessed_permit == &permit) {
|
||||
_blessed_permit = nullptr;
|
||||
maybe_admit_waiters();
|
||||
}
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::on_permit_used() noexcept {
|
||||
@@ -1082,6 +1241,12 @@ std::string reader_concurrency_semaphore::dump_diagnostics(unsigned max_lines) c
|
||||
return os.str();
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::foreach_permit(noncopyable_function<void(const reader_permit&)> func) {
|
||||
for (auto& p : _permit_list) {
|
||||
func(reader_permit(p.shared_from_this()));
|
||||
}
|
||||
}
|
||||
|
||||
// A file that tracks the memory usage of buffers resulting from read
|
||||
// operations.
|
||||
class tracking_file_impl : public file_impl {
|
||||
@@ -1153,8 +1318,10 @@ public:
|
||||
}
|
||||
|
||||
virtual future<temporary_buffer<uint8_t>> dma_read_bulk(uint64_t offset, size_t range_size, const io_priority_class& pc) override {
|
||||
return get_file_impl(_tracked_file)->dma_read_bulk(offset, range_size, pc).then([this, units = _permit.consume_memory(range_size)] (temporary_buffer<uint8_t> buf) {
|
||||
return make_ready_future<temporary_buffer<uint8_t>>(make_tracked_temporary_buffer(std::move(buf), _permit));
|
||||
return _permit.request_memory(range_size).then([this, offset, range_size, &pc] (reader_permit::resource_units units) {
|
||||
return get_file_impl(_tracked_file)->dma_read_bulk(offset, range_size, pc).then([this, units = std::move(units)] (temporary_buffer<uint8_t> buf) mutable {
|
||||
return make_ready_future<temporary_buffer<uint8_t>>(make_tracked_temporary_buffer(std::move(buf), std::move(units)));
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#include <seastar/core/expiring_fifo.hh>
|
||||
#include "reader_permit.hh"
|
||||
#include "readers/flat_mutation_reader_v2.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
|
||||
namespace bi = boost::intrusive;
|
||||
|
||||
@@ -41,6 +42,21 @@ using namespace seastar;
|
||||
/// code can be executed just before throwing (`prethrow_action`
|
||||
/// constructor parameter).
|
||||
///
|
||||
/// The semaphore has 3 layers of defense against consuming more memory
|
||||
/// than desired:
|
||||
/// 1) After memory consumption is larger than the configured memory limit,
|
||||
/// no more reads are admitted
|
||||
/// 2) After memory consumption is larger than `_serialize_limit_multiplier`
|
||||
/// times the configured memory limit, reads are serialized: only one of them
|
||||
/// is allowed to make progress, the rest is made to wait before they can
|
||||
/// consume more memory. Enforced via `request_memory()`.
|
||||
/// 4) After memory consumption is larger than `_kill_limit_multiplier`
|
||||
/// times the configured memory limit, reads are killed, by `consume()`
|
||||
/// throwing `std::bad_alloc`.
|
||||
///
|
||||
/// This makes `_kill_limit_multiplier` times the memory limit the effective
|
||||
/// upper bound of the memory consumed by reads.
|
||||
///
|
||||
/// The semaphore also acts as an execution stage for reads. This
|
||||
/// functionality is exposed via \ref with_permit() and \ref
|
||||
/// with_ready_permit().
|
||||
@@ -71,10 +87,14 @@ public:
|
||||
uint64_t total_failed_reads = 0;
|
||||
// Total number of reads rejected because the admission queue reached its max capacity
|
||||
uint64_t total_reads_shed_due_to_overload = 0;
|
||||
// Total number of reads killed due to the memory consumption reaching the kill limit.
|
||||
uint64_t total_reads_killed_due_to_kill_limit = 0;
|
||||
// Total number of reads admitted, via all admission paths.
|
||||
uint64_t reads_admitted = 0;
|
||||
// Total number of reads enqueued to wait for admission.
|
||||
uint64_t reads_enqueued = 0;
|
||||
uint64_t reads_enqueued_for_admission = 0;
|
||||
// Total number of reads enqueued to wait for memory.
|
||||
uint64_t reads_enqueued_for_memory = 0;
|
||||
// Total number of permits created so far.
|
||||
uint64_t total_permits = 0;
|
||||
// Current number of permits.
|
||||
@@ -177,11 +197,48 @@ private:
|
||||
resources _initial_resources;
|
||||
resources _resources;
|
||||
|
||||
expiring_fifo<entry, expiry_handler, db::timeout_clock> _wait_list;
|
||||
class wait_queue {
|
||||
// Stores entries for permits waiting to be admitted.
|
||||
expiring_fifo<entry, expiry_handler, db::timeout_clock> _admission_queue;
|
||||
// Stores entries for serialized permits waiting to obtain memory.
|
||||
expiring_fifo<entry, expiry_handler, db::timeout_clock> _memory_queue;
|
||||
public:
|
||||
wait_queue(expiry_handler eh) : _admission_queue(eh), _memory_queue(eh) { }
|
||||
size_t size() const {
|
||||
return _admission_queue.size() + _memory_queue.size();
|
||||
}
|
||||
bool empty() const {
|
||||
return _admission_queue.empty() && _memory_queue.empty();
|
||||
}
|
||||
void push_to_admission_queue(entry&& e, db::timeout_clock::time_point timeout) {
|
||||
_admission_queue.push_back(std::move(e), timeout);
|
||||
}
|
||||
void push_to_memory_queue(entry&& e, db::timeout_clock::time_point timeout) {
|
||||
_memory_queue.push_back(std::move(e), timeout);
|
||||
}
|
||||
entry& front() {
|
||||
if (_memory_queue.empty()) {
|
||||
return _admission_queue.front();
|
||||
} else {
|
||||
return _memory_queue.front();
|
||||
}
|
||||
}
|
||||
void pop_front() {
|
||||
if (_memory_queue.empty()) {
|
||||
_admission_queue.pop_front();
|
||||
} else {
|
||||
_memory_queue.pop_front();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
wait_queue _wait_list;
|
||||
queue<entry> _ready_list;
|
||||
|
||||
sstring _name;
|
||||
size_t _max_queue_length = std::numeric_limits<size_t>::max();
|
||||
utils::updateable_value<uint32_t> _serialize_limit_multiplier;
|
||||
utils::updateable_value<uint32_t> _kill_limit_multiplier;
|
||||
inactive_reads_type _inactive_reads;
|
||||
stats _stats;
|
||||
permit_list_type _permit_list;
|
||||
@@ -190,6 +247,7 @@ private:
|
||||
gate _close_readers_gate;
|
||||
gate _permit_gate;
|
||||
std::optional<future<>> _execution_loop_future;
|
||||
reader_permit::impl* _blessed_permit = nullptr;
|
||||
|
||||
private:
|
||||
void do_detach_inactive_reader(inactive_read&, evict_reason reason) noexcept;
|
||||
@@ -204,7 +262,8 @@ private:
|
||||
|
||||
// Add the permit to the wait queue and return the future which resolves when
|
||||
// the permit is admitted (popped from the queue).
|
||||
future<> enqueue_waiter(reader_permit permit, read_func func);
|
||||
enum class wait_on { admission, memory };
|
||||
future<> enqueue_waiter(reader_permit permit, read_func func, wait_on wait);
|
||||
void evict_readers_in_background();
|
||||
future<> do_wait_admission(reader_permit permit, read_func func = {});
|
||||
|
||||
@@ -218,6 +277,16 @@ private:
|
||||
|
||||
void maybe_admit_waiters() noexcept;
|
||||
|
||||
// Request more memory for the permit.
|
||||
// Request is instantly granted while memory consumption of all reads is
|
||||
// below _kill_limit_multiplier.
|
||||
// After memory consumption goes above the above limit, only one reader
|
||||
// (permit) is allowed to make progress, this method will block for all other
|
||||
// one, until:
|
||||
// * The blessed read finishes and a new blessed permit is choosen.
|
||||
// * Memory consumption falls below the limit.
|
||||
future<> request_memory(reader_permit::impl& permit, size_t memory);
|
||||
|
||||
void on_permit_created(reader_permit::impl&);
|
||||
void on_permit_destroyed(reader_permit::impl&) noexcept;
|
||||
|
||||
@@ -234,6 +303,13 @@ private:
|
||||
|
||||
future<> execution_loop() noexcept;
|
||||
|
||||
uint64_t get_serialize_limit() const;
|
||||
uint64_t get_kill_limit() const;
|
||||
|
||||
// Throws std::bad_alloc if memory consumed is oom_kill_limit_multiply_threshold more than the memory limit.
|
||||
void consume(reader_permit::impl& permit, resources r);
|
||||
void signal(const resources& r) noexcept;
|
||||
|
||||
public:
|
||||
struct no_limits { };
|
||||
|
||||
@@ -243,7 +319,9 @@ public:
|
||||
reader_concurrency_semaphore(int count,
|
||||
ssize_t memory,
|
||||
sstring name,
|
||||
size_t max_queue_length);
|
||||
size_t max_queue_length,
|
||||
utils::updateable_value<uint32_t> serialize_limit_multiplier,
|
||||
utils::updateable_value<uint32_t> kill_limit_multiplier);
|
||||
|
||||
/// Create a semaphore with practically unlimited count and memory.
|
||||
///
|
||||
@@ -258,8 +336,10 @@ public:
|
||||
reader_concurrency_semaphore(for_tests, sstring name,
|
||||
int count = std::numeric_limits<int>::max(),
|
||||
ssize_t memory = std::numeric_limits<ssize_t>::max(),
|
||||
size_t max_queue_length = std::numeric_limits<size_t>::max())
|
||||
: reader_concurrency_semaphore(count, memory, std::move(name), max_queue_length)
|
||||
size_t max_queue_length = std::numeric_limits<size_t>::max(),
|
||||
utils::updateable_value<uint32_t> serialize_limit_multipler = utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value<uint32_t> kill_limit_multipler = utils::updateable_value(std::numeric_limits<uint32_t>::max()))
|
||||
: reader_concurrency_semaphore(count, memory, std::move(name), max_queue_length, std::move(serialize_limit_multipler), std::move(kill_limit_multipler))
|
||||
{}
|
||||
|
||||
virtual ~reader_concurrency_semaphore();
|
||||
@@ -427,12 +507,6 @@ public:
|
||||
return _initial_resources - _resources;
|
||||
}
|
||||
|
||||
void consume(resources r) {
|
||||
_resources -= r;
|
||||
}
|
||||
|
||||
void signal(const resources& r) noexcept;
|
||||
|
||||
size_t waiters() const {
|
||||
return _wait_list.size();
|
||||
}
|
||||
@@ -452,4 +526,6 @@ public:
|
||||
uint64_t active_reads() const noexcept {
|
||||
return _stats.current_permits - _stats.inactive_reads - waiters();
|
||||
}
|
||||
|
||||
void foreach_permit(noncopyable_function<void(const reader_permit&)> func);
|
||||
};
|
||||
|
||||
@@ -79,7 +79,8 @@ public:
|
||||
class blocked_guard;
|
||||
|
||||
enum class state {
|
||||
waiting, // waiting for admission
|
||||
waiting_for_admission,
|
||||
waiting_for_memory,
|
||||
active_unused,
|
||||
active_used,
|
||||
active_blocked,
|
||||
@@ -100,8 +101,12 @@ private:
|
||||
explicit reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name,
|
||||
reader_resources base_resources, db::timeout_clock::time_point timeout);
|
||||
|
||||
void on_waiting();
|
||||
void on_waiting_for_admission();
|
||||
void on_waiting_for_memory(future<> f);
|
||||
void on_admission();
|
||||
void on_granted_memory();
|
||||
|
||||
future<> get_memory_future();
|
||||
|
||||
void mark_used() noexcept;
|
||||
|
||||
@@ -130,6 +135,8 @@ public:
|
||||
|
||||
reader_concurrency_semaphore& semaphore();
|
||||
|
||||
state get_state() const;
|
||||
|
||||
bool needs_readmission() const;
|
||||
|
||||
// Call only when needs_readmission() = true.
|
||||
@@ -143,6 +150,8 @@ public:
|
||||
|
||||
resource_units consume_resources(reader_resources res);
|
||||
|
||||
future<resource_units> request_memory(size_t memory);
|
||||
|
||||
reader_resources consumed_resources() const;
|
||||
|
||||
reader_resources base_resources() const;
|
||||
@@ -160,6 +169,8 @@ public:
|
||||
|
||||
void on_start_sstable_read() noexcept;
|
||||
void on_finish_sstable_read() noexcept;
|
||||
|
||||
uintptr_t id() { return reinterpret_cast<uintptr_t>(_impl.get()); }
|
||||
};
|
||||
|
||||
using reader_permit_opt = optimized_optional<reader_permit>;
|
||||
@@ -171,7 +182,9 @@ class reader_permit::resource_units {
|
||||
friend class reader_permit;
|
||||
friend class reader_concurrency_semaphore;
|
||||
private:
|
||||
resource_units(reader_permit permit, reader_resources res) noexcept;
|
||||
class already_consumed_tag {};
|
||||
resource_units(reader_permit permit, reader_resources res, already_consumed_tag);
|
||||
resource_units(reader_permit permit, reader_resources res);
|
||||
public:
|
||||
resource_units(const resource_units&) = delete;
|
||||
resource_units(resource_units&&) noexcept;
|
||||
@@ -184,6 +197,8 @@ public:
|
||||
reader_resources resources() const { return _resources; }
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, reader_permit::state s);
|
||||
|
||||
/// Mark a permit as used.
|
||||
///
|
||||
/// Conceptually, a permit is considered used, when at least one reader
|
||||
@@ -234,9 +249,13 @@ public:
|
||||
};
|
||||
|
||||
template <typename Char>
|
||||
temporary_buffer<Char> make_tracked_temporary_buffer(temporary_buffer<Char> buf, reader_permit& permit) {
|
||||
return temporary_buffer<Char>(buf.get_write(), buf.size(),
|
||||
make_deleter(buf.release(), [units = permit.consume_memory(buf.size())] () mutable { units.reset(); }));
|
||||
temporary_buffer<Char> make_tracked_temporary_buffer(temporary_buffer<Char> buf, reader_permit::resource_units units) {
|
||||
return temporary_buffer<Char>(buf.get_write(), buf.size(), make_object_deleter(buf.release(), std::move(units)));
|
||||
}
|
||||
|
||||
inline temporary_buffer<char> make_new_tracked_temporary_buffer(size_t size, reader_permit& permit) {
|
||||
auto buf = temporary_buffer<char>(size);
|
||||
return temporary_buffer<char>(buf.get_write(), buf.size(), make_object_deleter(buf.release(), permit.consume_memory(size)));
|
||||
}
|
||||
|
||||
file make_tracked_file(file f, reader_permit p);
|
||||
|
||||
@@ -328,14 +328,18 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
|
||||
, _read_concurrency_sem(max_count_concurrent_reads,
|
||||
max_memory_concurrent_reads(),
|
||||
"_read_concurrency_sem",
|
||||
max_inactive_queue_length())
|
||||
max_inactive_queue_length(),
|
||||
_cfg.reader_concurrency_semaphore_serialize_limit_multiplier,
|
||||
_cfg.reader_concurrency_semaphore_kill_limit_multiplier)
|
||||
// No timeouts or queue length limits - a failure here can kill an entire repair.
|
||||
// Trust the caller to limit concurrency.
|
||||
, _streaming_concurrency_sem(
|
||||
max_count_streaming_concurrent_reads,
|
||||
max_memory_streaming_concurrent_reads(),
|
||||
"_streaming_concurrency_sem",
|
||||
std::numeric_limits<size_t>::max())
|
||||
std::numeric_limits<size_t>::max(),
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()))
|
||||
// No limits, just for accounting.
|
||||
, _compaction_concurrency_sem(reader_concurrency_semaphore::no_limits{}, "compaction")
|
||||
, _system_read_concurrency_sem(
|
||||
@@ -343,7 +347,9 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
|
||||
max_count_concurrent_reads,
|
||||
max_memory_system_concurrent_reads(),
|
||||
"_system_read_concurrency_sem",
|
||||
std::numeric_limits<size_t>::max())
|
||||
std::numeric_limits<size_t>::max(),
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()))
|
||||
, _row_cache_tracker(cache_tracker::register_metrics::yes)
|
||||
, _apply_stage("db_apply", &database::do_apply)
|
||||
, _version(empty_version)
|
||||
|
||||
@@ -2058,37 +2058,31 @@ class scylla_memory(gdb.Command):
|
||||
|
||||
gdb.write('\n')
|
||||
|
||||
@staticmethod
|
||||
def format_semaphore_stats(semaphore):
|
||||
semaphore_name = "{} sstable reads:".format(str(semaphore['_name'])[1:-1].split("_")[1])
|
||||
initial_count = int(semaphore["_initial_resources"]["count"])
|
||||
initial_memory = int(semaphore["_initial_resources"]["memory"])
|
||||
used_count = initial_count - int(semaphore["_resources"]["count"])
|
||||
used_memory = initial_memory - int(semaphore["_resources"]["memory"])
|
||||
try:
|
||||
waiters = int(semaphore["_wait_list"]["_admission_queue"]["_size"])
|
||||
except gdb.error: # 5.1 compatibility
|
||||
waiters = int(semaphore["_wait_list"]["_size"])
|
||||
return f'{semaphore_name:<24} {used_count:>3}/{initial_count:>3}, {used_memory:>13}/{initial_memory:>13}, queued: {waiters}'
|
||||
|
||||
@staticmethod
|
||||
def print_replica_stats():
|
||||
db = find_db()
|
||||
if not db:
|
||||
return
|
||||
|
||||
try:
|
||||
mem_stats = dict()
|
||||
for key, sem in [('user_mem_str', db['_read_concurrency_sem']), ('streaming_mem_str', db['_streaming_concurrency_sem']), ('system_mem_str', db['_system_read_concurrency_sem'])]:
|
||||
mem_stats[key] = '{:>13}/{:>13} B'.format(int(sem['_initial_resources']['memory'] - sem['_resources']['memory']), int(sem['_initial_resources']['memory']))
|
||||
except gdb.error: # <= 4.2 compatibility
|
||||
for key, sem in [('user_mem_str', db['_read_concurrency_sem']), ('streaming_mem_str', db['_streaming_concurrency_sem']), ('system_mem_str', db['_system_read_concurrency_sem'])]:
|
||||
mem_stats[key] = 'remaining mem: {:>13} B'.format(int(sem['_resources']['memory']))
|
||||
|
||||
database_typename = lookup_type(['replica::database', 'database'])[1].name
|
||||
gdb.write('Replica:\n')
|
||||
gdb.write(' Read Concurrency Semaphores:\n'
|
||||
' user sstable reads: {user_sst_rd_count:>3}/{user_sst_rd_max_count:>3}, {user_mem_str}, queued: {user_sst_rd_queued}\n'
|
||||
' streaming sstable reads: {streaming_sst_rd_count:>3}/{streaming_sst_rd_max_count:>3}, {streaming_mem_str}, queued: {streaming_sst_rd_queued}\n'
|
||||
' system sstable reads: {system_sst_rd_count:>3}/{system_sst_rd_max_count:>3}, {system_mem_str}, queued: {system_sst_rd_queued}\n'
|
||||
.format(
|
||||
user_sst_rd_count=int(gdb.parse_and_eval(f'{database_typename}::max_count_concurrent_reads')) - int(db['_read_concurrency_sem']['_resources']['count']),
|
||||
user_sst_rd_max_count=int(gdb.parse_and_eval(f'{database_typename}::max_count_concurrent_reads')),
|
||||
user_sst_rd_queued=int(db['_read_concurrency_sem']['_wait_list']['_size']),
|
||||
streaming_sst_rd_count=int(gdb.parse_and_eval(f'{database_typename}::max_count_streaming_concurrent_reads')) - int(db['_streaming_concurrency_sem']['_resources']['count']),
|
||||
streaming_sst_rd_max_count=int(gdb.parse_and_eval(f'{database_typename}::max_count_streaming_concurrent_reads')),
|
||||
streaming_sst_rd_queued=int(db['_streaming_concurrency_sem']['_wait_list']['_size']),
|
||||
system_sst_rd_count=int(gdb.parse_and_eval(f'{database_typename}::max_count_system_concurrent_reads')) - int(db['_system_read_concurrency_sem']['_resources']['count']),
|
||||
system_sst_rd_max_count=int(gdb.parse_and_eval(f'{database_typename}::max_count_system_concurrent_reads')),
|
||||
system_sst_rd_queued=int(db['_system_read_concurrency_sem']['_wait_list']['_size']),
|
||||
**mem_stats))
|
||||
gdb.write(' Read Concurrency Semaphores:\n {}\n {}\n {}\n'.format(
|
||||
scylla_memory.format_semaphore_stats(db['_read_concurrency_sem']),
|
||||
scylla_memory.format_semaphore_stats(db['_streaming_concurrency_sem']),
|
||||
scylla_memory.format_semaphore_stats(db['_system_read_concurrency_sem'])))
|
||||
|
||||
gdb.write(' Execution Stages:\n')
|
||||
for es_path in [('_apply_stage',)]:
|
||||
@@ -4285,7 +4279,7 @@ class scylla_generate_object_graph(gdb.Command):
|
||||
at. The generated graph is an image, which allows the visual inspection of the
|
||||
object graph.
|
||||
|
||||
The graph is generated with the help of `graphwiz`. The command
|
||||
The graph is generated with the help of `graphviz`. The command
|
||||
generates `.dot` files which can be converted to images with the help of
|
||||
the `dot` utility. The command can do this if the output file is one of
|
||||
the supported image formats (e.g. `png`), otherwise only the `.dot` file
|
||||
@@ -4398,7 +4392,7 @@ class scylla_generate_object_graph(gdb.Command):
|
||||
help="Output file. Supported extensions are: dot, png, jpg, jpeg, svg and pdf."
|
||||
" Regardless of the extension, a `.dot` file will always be generated."
|
||||
" If the output is one of the graphic formats the command will convert the `.dot` file using the `dot` utility."
|
||||
" In this case the dot utility from the graphwiz suite has to be installed on the machine."
|
||||
" In this case the dot utility from the graphviz suite has to be installed on the machine."
|
||||
" To manually convert the `.dot` file do: `dot -Tpng graph.dot -o graph.png`.")
|
||||
parser.add_argument("-d", "--max-depth", action="store", type=int, default=5,
|
||||
help="Maximum depth to traverse the object graph. Set to -1 for unlimited depth. Default is 5.")
|
||||
@@ -5154,11 +5148,17 @@ class scylla_read_stats(gdb.Command):
|
||||
else:
|
||||
inactive_read_count = len(intrusive_list(semaphore['_inactive_reads']))
|
||||
|
||||
|
||||
try:
|
||||
waiters = int(semaphore["_wait_list"]["_admission_queue"]["_size"])
|
||||
except gdb.error: # 5.1 compatibility
|
||||
waiters = int(semaphore["_wait_list"]["_size"])
|
||||
|
||||
gdb.write("Semaphore {} with: {}/{} count and {}/{} memory resources, queued: {}, inactive={}\n".format(
|
||||
semaphore_name,
|
||||
initial_count - int(semaphore['_resources']['count']), initial_count,
|
||||
initial_memory - int(semaphore['_resources']['memory']), initial_memory,
|
||||
int(semaphore['_wait_list']['_size']), inactive_read_count))
|
||||
waiters, inactive_read_count))
|
||||
|
||||
gdb.write("{:>10} {:5} {:>12} {}\n".format('permits', 'count', 'memory', 'table/description/state'))
|
||||
|
||||
|
||||
@@ -388,8 +388,9 @@ public:
|
||||
if (_pos != _beg_pos && addr.offset != 0) {
|
||||
throw std::runtime_error("compressed reader out of sync");
|
||||
}
|
||||
return _input_stream->read_exactly(addr.chunk_len).
|
||||
then([this, addr](temporary_buffer<char> buf) {
|
||||
return _input_stream->read_exactly(addr.chunk_len).then([this, addr](temporary_buffer<char> buf) {
|
||||
return _permit.request_memory(_compression_metadata->uncompressed_chunk_length()).then(
|
||||
[this, addr, buf = std::move(buf)] (reader_permit::resource_units res_units) mutable {
|
||||
// The last 4 bytes of the chunk are the adler32/crc32 checksum
|
||||
// of the rest of the (compressed) chunk.
|
||||
auto compressed_len = addr.chunk_len - 4;
|
||||
@@ -415,7 +416,8 @@ public:
|
||||
_pos += out.size();
|
||||
_underlying_pos += addr.chunk_len;
|
||||
|
||||
return make_tracked_temporary_buffer(std::move(out), _permit);
|
||||
return make_tracked_temporary_buffer(std::move(out), std::move(res_units));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -134,7 +134,7 @@ private:
|
||||
return read_status::ready;
|
||||
} else {
|
||||
_read_bytes.clear();
|
||||
_read_bytes.push_back(make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit));
|
||||
_read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit));
|
||||
std::copy(data.begin(), data.end(), _read_bytes.front().get_write());
|
||||
_read_bytes_len = len;
|
||||
_pos = data.size();
|
||||
@@ -208,7 +208,7 @@ public:
|
||||
} else {
|
||||
// copy what we have so far, read the rest later
|
||||
_read_bytes.clear();
|
||||
_read_bytes.push_back(make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit));
|
||||
_read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit));
|
||||
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
|
||||
_read_bytes_len = len;
|
||||
_read_bytes_where_contiguous = &where;
|
||||
@@ -273,7 +273,7 @@ public:
|
||||
return read_bytes_contiguous(data, static_cast<uint32_t>(_u64), where);
|
||||
} else {
|
||||
_read_bytes.clear();
|
||||
_read_bytes.push_back(make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit));
|
||||
_read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit));
|
||||
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
|
||||
_read_bytes_len = len;
|
||||
_pos = data.size();
|
||||
@@ -298,7 +298,7 @@ public:
|
||||
return read_bytes(data, static_cast<uint32_t>(_u64), where);
|
||||
} else {
|
||||
_read_bytes.clear();
|
||||
_read_bytes.push_back(make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit));
|
||||
_read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit));
|
||||
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
|
||||
_read_bytes_len = len;
|
||||
_pos = data.size();
|
||||
|
||||
@@ -25,7 +25,9 @@ sstables_manager::sstables_manager(
|
||||
max_count_sstable_metadata_concurrent_reads,
|
||||
max_memory_sstable_metadata_concurrent_reads(available_memory),
|
||||
"sstable_metadata_concurrency_sem",
|
||||
std::numeric_limits<size_t>::max())
|
||||
std::numeric_limits<size_t>::max(),
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
|
||||
utils::updateable_value(std::numeric_limits<uint32_t>::max()))
|
||||
, _dir_semaphore(dir_sem)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -674,11 +674,8 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) {
|
||||
BOOST_CHECK_EQUAL(db.get_querier_cache_stats().resource_based_evictions, 0);
|
||||
|
||||
// Drain all resources of the semaphore
|
||||
const auto available_resources = semaphore.available_resources();
|
||||
semaphore.consume(available_resources);
|
||||
auto release_resources = defer([&semaphore, available_resources] {
|
||||
semaphore.signal(available_resources);
|
||||
});
|
||||
auto sponge_permit = semaphore.make_tracking_only_permit(s.get(), "sponge", db::no_timeout);
|
||||
auto consumed_resources = sponge_permit.consume_resources(semaphore.available_resources());
|
||||
|
||||
auto cmd2 = query::read_command(s->id(),
|
||||
s->version(),
|
||||
|
||||
@@ -9,18 +9,25 @@
|
||||
#include <seastar/util/closeable.hh>
|
||||
#include <seastar/core/file.hh>
|
||||
#include "reader_concurrency_semaphore.hh"
|
||||
#include "sstables/sstables_manager.hh"
|
||||
#include "test/lib/log.hh"
|
||||
#include "test/lib/simple_schema.hh"
|
||||
#include "test/lib/cql_assertions.hh"
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "test/lib/eventually.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "test/lib/random_schema.hh"
|
||||
#include "test/lib/tmpdir.hh"
|
||||
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include <seastar/testing/thread_test_case.hh>
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include "readers/empty_v2.hh"
|
||||
#include "readers/from_mutations_v2.hh"
|
||||
#include "replica/database.hh" // new_reader_base_cost is there :(
|
||||
#include "db/config.hh"
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads) {
|
||||
simple_schema s;
|
||||
@@ -136,8 +143,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources - permit->consumed_resources());
|
||||
|
||||
if (i % 2) {
|
||||
const auto consumed_resources = semaphore.available_resources();
|
||||
semaphore.consume(consumed_resources);
|
||||
auto sponge_permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout);
|
||||
auto consumed_resources = sponge_permit.consume_resources(semaphore.available_resources());
|
||||
|
||||
auto fut = make_ready_future<>();
|
||||
if (permit->needs_readmission()) {
|
||||
@@ -145,7 +152,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves
|
||||
}
|
||||
BOOST_REQUIRE(!fut.available());
|
||||
|
||||
semaphore.signal(consumed_resources);
|
||||
consumed_resources.reset();
|
||||
fut.get();
|
||||
} else {
|
||||
if (permit->needs_readmission()) {
|
||||
@@ -386,7 +393,7 @@ class dummy_file_impl : public file_impl {
|
||||
}
|
||||
|
||||
virtual future<temporary_buffer<uint8_t>> dma_read_bulk(uint64_t offset, size_t range_size, const io_priority_class& pc) override {
|
||||
temporary_buffer<uint8_t> buf(1024);
|
||||
temporary_buffer<uint8_t> buf(range_size);
|
||||
|
||||
memset(buf.get_write(), 0xff, buf.size());
|
||||
|
||||
@@ -405,23 +412,23 @@ SEASTAR_TEST_CASE(reader_restriction_file_tracking) {
|
||||
|
||||
BOOST_REQUIRE_EQUAL(4 * 1024, semaphore.available_resources().memory);
|
||||
|
||||
auto buf1 = tracked_file.dma_read_bulk<char>(0, 0).get0();
|
||||
auto buf1 = tracked_file.dma_read_bulk<char>(0, 1024).get0();
|
||||
BOOST_REQUIRE_EQUAL(3 * 1024, semaphore.available_resources().memory);
|
||||
|
||||
auto buf2 = tracked_file.dma_read_bulk<char>(0, 0).get0();
|
||||
auto buf2 = tracked_file.dma_read_bulk<char>(0, 1024).get0();
|
||||
BOOST_REQUIRE_EQUAL(2 * 1024, semaphore.available_resources().memory);
|
||||
|
||||
auto buf3 = tracked_file.dma_read_bulk<char>(0, 0).get0();
|
||||
auto buf3 = tracked_file.dma_read_bulk<char>(0, 1024).get0();
|
||||
BOOST_REQUIRE_EQUAL(1 * 1024, semaphore.available_resources().memory);
|
||||
|
||||
auto buf4 = tracked_file.dma_read_bulk<char>(0, 0).get0();
|
||||
auto buf4 = tracked_file.dma_read_bulk<char>(0, 1024).get0();
|
||||
BOOST_REQUIRE_EQUAL(0 * 1024, semaphore.available_resources().memory);
|
||||
|
||||
auto buf5 = tracked_file.dma_read_bulk<char>(0, 0).get0();
|
||||
auto buf5 = tracked_file.dma_read_bulk<char>(0, 1024).get0();
|
||||
BOOST_REQUIRE_EQUAL(-1 * 1024, semaphore.available_resources().memory);
|
||||
|
||||
// Reassing buf1, should still have the same amount of units.
|
||||
buf1 = tracked_file.dma_read_bulk<char>(0, 0).get0();
|
||||
buf1 = tracked_file.dma_read_bulk<char>(0, 1024).get0();
|
||||
BOOST_REQUIRE_EQUAL(-1 * 1024, semaphore.available_resources().memory);
|
||||
|
||||
// Move buf1 to the heap, so that we can safely destroy it
|
||||
@@ -619,7 +626,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
|
||||
const auto stats_after = semaphore.get_stats();
|
||||
BOOST_REQUIRE_EQUAL(stats_after.reads_admitted, stats_before.reads_admitted + uint64_t(can_admit));
|
||||
// Deliberately not checking `reads_enqueued`, a read can be enqueued temporarily during the admission process.
|
||||
// Deliberately not checking `reads_enqueued_for_admission`, a read can be enqueued temporarily during the admission process.
|
||||
|
||||
if (can_admit == expected_can_admit) {
|
||||
testlog.trace("admission scenario '{}' with expected_can_admit={} passed at {}:{}", description, expected_can_admit, sl.file_name(),
|
||||
@@ -644,7 +651,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
{
|
||||
const auto stats_after = semaphore.get_stats();
|
||||
BOOST_REQUIRE(!enqueued_permit_fut.available());
|
||||
BOOST_REQUIRE_EQUAL(stats_after.reads_enqueued, stats_before.reads_enqueued + 1);
|
||||
BOOST_REQUIRE_EQUAL(stats_after.reads_enqueued_for_admission, stats_before.reads_enqueued_for_admission + 1);
|
||||
BOOST_REQUIRE_EQUAL(stats_after.reads_admitted, stats_before.reads_admitted);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1);
|
||||
}
|
||||
@@ -682,10 +689,9 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
|
||||
// forward progress -- resources
|
||||
{
|
||||
const auto resources = reader_resources::with_memory(semaphore.available_resources().memory);
|
||||
semaphore.consume(resources);
|
||||
auto sponge_permit = semaphore.make_tracking_only_permit(nullptr, "sponge", db::no_timeout);
|
||||
sponge_permit.consume_resources(reader_resources::with_memory(semaphore.available_resources().memory));
|
||||
require_can_admit(true, "semaphore with no memory but all count available");
|
||||
semaphore.signal(resources);
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources);
|
||||
require_can_admit(true, "semaphore in initial state");
|
||||
@@ -711,7 +717,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
|
||||
const auto stats_after = semaphore.get_stats();
|
||||
BOOST_REQUIRE_EQUAL(stats_after.reads_admitted, stats_before.reads_admitted + 1);
|
||||
BOOST_REQUIRE_EQUAL(stats_after.reads_enqueued, stats_before.reads_enqueued);
|
||||
BOOST_REQUIRE_EQUAL(stats_after.reads_enqueued_for_admission, stats_before.reads_enqueued_for_admission);
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources);
|
||||
require_can_admit(true, "semaphore in initial state");
|
||||
@@ -768,7 +774,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
|
||||
const auto stats_after = semaphore.get_stats();
|
||||
BOOST_REQUIRE_EQUAL(stats_after.reads_admitted, stats_before.reads_admitted);
|
||||
BOOST_REQUIRE_EQUAL(stats_after.reads_enqueued, stats_before.reads_enqueued + 1);
|
||||
BOOST_REQUIRE_EQUAL(stats_after.reads_enqueued_for_admission, stats_before.reads_enqueued_for_admission + 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1);
|
||||
|
||||
auto cookie2 = post_enqueue_hook(cookie1);
|
||||
@@ -1007,7 +1013,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_
|
||||
// * no more count resources left
|
||||
auto p3_fut = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued, 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_admission, 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, 0);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1);
|
||||
@@ -1068,7 +1074,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_set_resources) {
|
||||
BOOST_REQUIRE_EQUAL(semaphore.initial_resources(), reader_resources(1, 3 * 1024));
|
||||
|
||||
auto permit3_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued, 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_admission, 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1);
|
||||
|
||||
semaphore.set_resources({4, 4 * 1024});
|
||||
@@ -1077,3 +1083,492 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_set_resources) {
|
||||
BOOST_REQUIRE_EQUAL(semaphore.initial_resources(), reader_resources(4, 4 * 1024));
|
||||
permit3_fut.get();
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
class allocating_reader {
|
||||
static constexpr size_t admission_cost = 1024;
|
||||
static constexpr size_t buf_size = 1024;
|
||||
static constexpr size_t read_iterations = 4;
|
||||
public:
|
||||
enum class state {
|
||||
wait_for_admission,
|
||||
request_memory,
|
||||
wait_for_memory,
|
||||
release_memory,
|
||||
done,
|
||||
};
|
||||
const char* to_string(state s) {
|
||||
switch (s) {
|
||||
case state::wait_for_admission: return "state::wait_for_admission";
|
||||
case state::request_memory: return "state::request_memory";
|
||||
case state::wait_for_memory: return "state::wait_for_memory";
|
||||
case state::release_memory: return "state::release_memory";
|
||||
case state::done: return "state::done";
|
||||
}
|
||||
};
|
||||
private:
|
||||
reader_concurrency_semaphore& _sem;
|
||||
state _state = state::wait_for_admission;
|
||||
std::optional<future<>> _admission_fut;
|
||||
std::optional<reader_permit> _permit;
|
||||
std::list<reader_permit::resource_units> _current_resource_units;
|
||||
std::list<future<reader_permit::resource_units>> _pending_resource_units;
|
||||
unsigned _read_count = 0;
|
||||
bool _success = true;
|
||||
public:
|
||||
explicit allocating_reader(reader_concurrency_semaphore& sem) : _sem(sem) {
|
||||
testlog.debug("[{}] allocating_reader created", fmt::ptr(this));
|
||||
_admission_fut = sem.obtain_permit(nullptr, "reader", admission_cost, db::no_timeout).then_wrapped([this] (future<reader_permit>&& permit_fut) {
|
||||
try {
|
||||
_permit = std::move(permit_fut.get());
|
||||
_state = state::request_memory;
|
||||
} catch (...) {
|
||||
_state = state::done;
|
||||
_success = false;
|
||||
}
|
||||
});
|
||||
}
|
||||
~allocating_reader() { }
|
||||
void operator()() {
|
||||
testlog.debug("[{}|p:0x{:x}] allocating_reader(): _state={}, _permit.state={}, _permit.resources={}, _sem.resources={}",
|
||||
fmt::ptr(this),
|
||||
_permit ? _permit->id() : 0,
|
||||
to_string(_state),
|
||||
_permit ? format("{}", _permit->get_state()) : "N/A",
|
||||
_permit ? _permit->consumed_resources() : reader_resources{},
|
||||
_sem.consumed_resources());
|
||||
switch (_state) {
|
||||
case state::wait_for_admission:
|
||||
break;
|
||||
case state::request_memory:
|
||||
{
|
||||
size_t n = 0;
|
||||
if (!_read_count) {
|
||||
n = 1;
|
||||
} else {
|
||||
n = tests::random::get_int(1, 8);
|
||||
}
|
||||
++_read_count;
|
||||
try {
|
||||
for (size_t i = 0; i < n; ++i) {
|
||||
_pending_resource_units.emplace_back(_permit->request_memory(buf_size));
|
||||
}
|
||||
} catch (std::bad_alloc&) {
|
||||
testlog.debug("[{}|p:{}] read killed", fmt::ptr(this), _permit ? _permit->id() : 0);
|
||||
_read_count = read_iterations;
|
||||
}
|
||||
_state = state::wait_for_memory;
|
||||
break;
|
||||
}
|
||||
case state::wait_for_memory:
|
||||
for (auto it = _pending_resource_units.begin(); it != _pending_resource_units.end();) {
|
||||
if (it->available()) {
|
||||
try {
|
||||
_current_resource_units.push_back(it->get());
|
||||
} catch (std::bad_alloc&) {
|
||||
testlog.debug("[{}|p:{}] read killed", fmt::ptr(this), _permit ? _permit->id() : 0);
|
||||
_read_count = read_iterations;
|
||||
}
|
||||
it = _pending_resource_units.erase(it);
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
if (_pending_resource_units.empty()) {
|
||||
_state = state::release_memory;
|
||||
}
|
||||
break;
|
||||
case state::release_memory:
|
||||
if (_current_resource_units.empty()) {
|
||||
if (_read_count == read_iterations) {
|
||||
_state = state::done;
|
||||
} else if (!tests::random::get_int(0, 7)) {
|
||||
_state = state::done;
|
||||
} else {
|
||||
_state = state::request_memory;
|
||||
}
|
||||
} else {
|
||||
_current_resource_units.pop_front();
|
||||
}
|
||||
break;
|
||||
case state::done:
|
||||
_permit.reset();
|
||||
break;
|
||||
}
|
||||
}
|
||||
bool done() const { return _state == state::done; }
|
||||
bool success() const { return _success; }
|
||||
reader_resources resources() const { return _permit ? _permit->consumed_resources() : reader_resources{}; }
|
||||
future<> close() {
|
||||
if (_admission_fut) {
|
||||
co_await std::move(_admission_fut).value();
|
||||
}
|
||||
co_await coroutine::parallel_for_each(_pending_resource_units.begin(), _pending_resource_units.end(), [this] (future<reader_permit::resource_units>& fut) {
|
||||
return std::move(fut).then_wrapped([] (future<reader_permit::resource_units>&& fut) {
|
||||
try {
|
||||
fut.get();
|
||||
} catch (...) {
|
||||
}
|
||||
});
|
||||
});
|
||||
_current_resource_units.clear();
|
||||
_permit.reset();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
void dump_stats(const reader_concurrency_semaphore& semaphore, log_level lvl, bool with_diag = false) {
|
||||
const auto& stats = semaphore.get_stats();
|
||||
testlog.log(lvl, "stats = {{\n"
|
||||
"\t.permit_based_evictions = {}\n"
|
||||
"\t.time_based_evictions = {}\n"
|
||||
"\t.inactive_reads = {}\n"
|
||||
"\t.total_successful_reads = {}\n"
|
||||
"\t.total_failed_reads = {}\n"
|
||||
"\t.total_reads_shed_due_to_overload = {}\n"
|
||||
"\t.total_reads_killed_due_to_kill_limit = {}\n"
|
||||
"\t.reads_admitted = {}\n"
|
||||
"\t.reads_enqueued_for_admission = {}\n"
|
||||
"\t.reads_enqueued_for_memory = {}\n"
|
||||
"\t.total_permits = {}\n"
|
||||
"\t.current_permits = {}\n"
|
||||
"\t.used_permits = {}\n"
|
||||
"\t.blocked_permits = {}\n"
|
||||
"}}{}",
|
||||
stats.permit_based_evictions,
|
||||
stats.time_based_evictions,
|
||||
stats.inactive_reads,
|
||||
stats.total_successful_reads,
|
||||
stats.total_failed_reads,
|
||||
stats.total_reads_shed_due_to_overload,
|
||||
stats.total_reads_killed_due_to_kill_limit,
|
||||
stats.reads_admitted,
|
||||
stats.reads_enqueued_for_admission,
|
||||
stats.reads_enqueued_for_memory,
|
||||
stats.total_permits,
|
||||
stats.current_permits,
|
||||
stats.used_permits,
|
||||
stats.blocked_permits,
|
||||
with_diag ? format("\n{}", semaphore.dump_diagnostics()) : "");
|
||||
};
|
||||
|
||||
} //anonymous namespace
|
||||
|
||||
// Check that the memory consumption limiting mechanism doesn't leak any
|
||||
// resources or cause any internal consistencies in the semaphore.
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_memory_limit_no_leaks) {
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{4, 4 * 1024};
|
||||
const auto serialize_multiplier = 2;
|
||||
const auto kill_multiplier = 3;
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
|
||||
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier));
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
const size_t reader_count_target = 6;
|
||||
const size_t iteration_limit = 100;
|
||||
|
||||
std::list<allocating_reader> readers;
|
||||
|
||||
size_t i = 0;
|
||||
bool done = false;
|
||||
sstring error = "";
|
||||
while (!done) {
|
||||
testlog.debug("iteration {}", i);
|
||||
|
||||
for (auto& rd : readers) {
|
||||
rd();
|
||||
reader_resources all_permit_res;
|
||||
semaphore.foreach_permit([&all_permit_res] (const reader_permit& p) { all_permit_res += p.consumed_resources(); });
|
||||
if (semaphore.consumed_resources() != all_permit_res) {
|
||||
testlog.error("resource mismatch: semaphore.consumed_resources() ({}) != sum of resources in permits ({})", semaphore.consumed_resources(), all_permit_res);
|
||||
}
|
||||
}
|
||||
|
||||
if (readers.size() < reader_count_target) {
|
||||
readers.emplace_back(semaphore);
|
||||
}
|
||||
done = std::all_of(readers.begin(), readers.end(), std::mem_fn(&allocating_reader::done));
|
||||
|
||||
dump_stats(semaphore, log_level::debug, true);
|
||||
|
||||
reader_resources all_permit_res;
|
||||
semaphore.foreach_permit([&all_permit_res] (const reader_permit& p) { all_permit_res += p.consumed_resources(); });
|
||||
|
||||
if (semaphore.consumed_resources().memory >= (semaphore.initial_resources().memory * kill_multiplier)) {
|
||||
error = format("kill limit failed: semaphore.consumed_resources() ({}) >= kill limit ({})", semaphore.consumed_resources().memory, (semaphore.initial_resources().memory * kill_multiplier));
|
||||
} else if (semaphore.consumed_resources() != all_permit_res) {
|
||||
error = format("resource mismatch: semaphore.consumed_resources() ({}) != sum of resources in permits ({})", semaphore.consumed_resources(), all_permit_res);
|
||||
} else if (i >= iteration_limit) {
|
||||
error = format("test failed to finish in {} iterations", iteration_limit);
|
||||
}
|
||||
|
||||
if (error.empty()) {
|
||||
++i;
|
||||
} else {
|
||||
testlog.error("stopping test at iteration {}: {}", i, error);
|
||||
done = true;
|
||||
}
|
||||
|
||||
seastar::thread::yield();
|
||||
}
|
||||
dump_stats(semaphore, log_level::info, false);
|
||||
parallel_for_each(readers.begin(), readers.end(), [] (allocating_reader& rd) {
|
||||
return rd.close();
|
||||
}).get();
|
||||
if (!error.empty()) {
|
||||
BOOST_FAIL(error);
|
||||
}
|
||||
const bool all_ok = std::all_of(readers.begin(), readers.end(), std::mem_fn(&allocating_reader::success));
|
||||
BOOST_REQUIRE(all_ok);
|
||||
}
|
||||
|
||||
struct memory_limit_table {
|
||||
schema_ptr schema;
|
||||
tmpdir sst_dir;
|
||||
int32_t pk;
|
||||
int32_t ck;
|
||||
sstring value;
|
||||
};
|
||||
memory_limit_table create_memory_limit_table(cql_test_env& env, uint64_t target_num_sstables) {
|
||||
auto& db = env.local_db();
|
||||
|
||||
sstring value(256 * 1024, '0');
|
||||
|
||||
env.execute_cql("CREATE TABLE ks.tbl (pk int, ck int, value text, primary key (pk, ck)) WITH compaction = {'class': 'NullCompactionStrategy'};").get();
|
||||
|
||||
env.require_table_exists("ks", "tbl").get();
|
||||
|
||||
auto& tbl = db.find_column_family("ks", "tbl");
|
||||
auto s = tbl.schema();
|
||||
auto& sst_man = tbl.get_sstables_manager();
|
||||
auto& semaphore = db.get_reader_concurrency_semaphore();
|
||||
|
||||
int32_t pk = 0;
|
||||
dht::decorated_key dk(dht::token(), partition_key::make_empty());
|
||||
do {
|
||||
dk = dht::decorate_key(*s, partition_key::from_single_value(*s, serialized(++pk)));
|
||||
} while (dht::shard_of(*s, dk.token()) != this_shard_id());
|
||||
|
||||
const int32_t ck = 0;
|
||||
|
||||
mutation mut(s, std::move(dk));
|
||||
mut.set_clustered_cell(clustering_key::from_single_value(*s, serialized(ck)), to_bytes("value"), data_value(value), 0);
|
||||
|
||||
auto sstables_dir = tmpdir();
|
||||
|
||||
const auto sstable_write_concurrency = 16;
|
||||
|
||||
auto num_sstables = 0;
|
||||
parallel_for_each(boost::irange(0, sstable_write_concurrency), [&] (int i) {
|
||||
return seastar::async([&, i] {
|
||||
while (num_sstables != target_num_sstables) {
|
||||
++num_sstables;
|
||||
auto sst = sst_man.make_sstable(s, sstables_dir.path().string(), sstables::generation_type{num_sstables}, sst_man.get_highest_supported_format(), sstables::sstable_format_types::big);
|
||||
auto writer_cfg = sst_man.configure_writer("test");
|
||||
sst->write_components(
|
||||
make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_tracking_only_permit(s.get(), "test", db::no_timeout), mut, s->full_slice()),
|
||||
1,
|
||||
s,
|
||||
writer_cfg,
|
||||
encoding_stats{}).get();
|
||||
sst->load().get();
|
||||
tbl.add_sstable_and_update_cache(std::move(sst)).get();
|
||||
}
|
||||
});
|
||||
}).get();
|
||||
|
||||
return {s, std::move(sstables_dir), pk, ck, std::move(value)};
|
||||
}
|
||||
|
||||
constexpr uint64_t target_memory = uint64_t(1) << 28; // 256MB
|
||||
|
||||
// Check that the memory consumption limiting mechanism of the semaphore does
|
||||
// prevent OOM crashes.
|
||||
// The test fails by OOM crashing.
|
||||
// This test should be run with 256MB of memory.
|
||||
SEASTAR_TEST_CASE(test_reader_concurrency_semaphore_memory_limit_no_oom) {
|
||||
if (memory::stats().total_memory() != target_memory) {
|
||||
std::cerr << "Test " << get_name() << " should be run with 256M of memory, make sure you invoke with -m256M" << std::endl;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto db_cfg_ptr = make_shared<db::config>();
|
||||
auto& db_cfg = *db_cfg_ptr;
|
||||
|
||||
// Disable the cache altogether, we want all reads to go to disk.
|
||||
db_cfg.enable_cache(false);
|
||||
db_cfg.enable_commitlog(false);
|
||||
db_cfg.reader_concurrency_semaphore_serialize_limit_multiplier.set(2, utils::config_file::config_source::CommandLine);
|
||||
db_cfg.reader_concurrency_semaphore_kill_limit_multiplier.set(4, utils::config_file::config_source::CommandLine);
|
||||
|
||||
return do_with_cql_env_thread([] (cql_test_env& env) {
|
||||
auto tbl = create_memory_limit_table(env, 256);
|
||||
|
||||
auto& db = env.local_db();
|
||||
auto& semaphore = db.get_reader_concurrency_semaphore();
|
||||
|
||||
const auto num_reads = 128;
|
||||
|
||||
auto read_id = env.prepare("SELECT value FROM ks.tbl WHERE pk = ? AND ck = ?").get0();
|
||||
|
||||
parallel_for_each(boost::irange(0, num_reads), [&] (int i) {
|
||||
return env.execute_prepared(read_id, {cql3::raw_value::make_value(serialized(tbl.pk)), cql3::raw_value::make_value(serialized(tbl.ck))}).then_wrapped(
|
||||
[&] (future<shared_ptr<cql_transport::messages::result_message>> fut) {
|
||||
if (fut.failed()) {
|
||||
// We expect failed, OOM-killed reads here.
|
||||
// No way to verify why they failed so we swallow all failures.
|
||||
fut.ignore_ready_future();
|
||||
return;
|
||||
}
|
||||
assert_that(fut.get()).is_rows().with_rows_ignore_order({ {serialized(tbl.value)} });
|
||||
});
|
||||
}).get();
|
||||
return make_ready_future<>();
|
||||
}, std::move(db_cfg_ptr));
|
||||
}
|
||||
|
||||
// Check that the memory consumption limiting mechanism of the semaphore does
|
||||
// prevent reads exhausting memory to the extent that they start to fail due to
|
||||
// bad alloc (but not necessarily crash the node).
|
||||
// Instead the limiting mechanism engages and kills reads before they get to that
|
||||
// point. From the outset, a read failing due to OOM and a read killed by the
|
||||
// limiting mechanism looks the same. To differentiate, the test checks that all
|
||||
// failures were caused by the limiting mechanism.
|
||||
// This test should be run with 256M memory.
|
||||
SEASTAR_TEST_CASE(test_reader_concurrency_semaphore_memory_limit_engages) {
|
||||
if (memory::stats().total_memory() != target_memory) {
|
||||
std::cerr << "Test " << get_name() << " should be run with 256M of memory, make sure you invoke with -m256M" << std::endl;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
auto db_cfg_ptr = make_shared<db::config>();
|
||||
auto& db_cfg = *db_cfg_ptr;
|
||||
|
||||
// Disable the cache altogether, we want all reads to go to disk.
|
||||
db_cfg.enable_cache(false);
|
||||
db_cfg.enable_commitlog(false);
|
||||
db_cfg.reader_concurrency_semaphore_serialize_limit_multiplier.set(2, utils::config_file::config_source::CommandLine);
|
||||
db_cfg.reader_concurrency_semaphore_kill_limit_multiplier.set(4, utils::config_file::config_source::CommandLine);
|
||||
|
||||
return do_with_cql_env_thread([] (cql_test_env& env) {
|
||||
auto tbl = create_memory_limit_table(env, 32);
|
||||
|
||||
auto& db = env.local_db();
|
||||
auto& semaphore = db.get_reader_concurrency_semaphore();
|
||||
|
||||
const auto num_reads = 128;
|
||||
|
||||
auto read_id = env.prepare("SELECT value FROM ks.tbl WHERE pk = ? AND ck = ?").get0();
|
||||
|
||||
// We first check that the test params are not too strict and a single
|
||||
// read can finish successfully.
|
||||
try {
|
||||
auto msg = env.execute_prepared(read_id, {cql3::raw_value::make_value(serialized(tbl.pk)), cql3::raw_value::make_value(serialized(tbl.ck))}).get();
|
||||
assert_that(msg).is_rows().with_rows_ignore_order({ {serialized(tbl.value)} });
|
||||
} catch (...) {
|
||||
BOOST_FAIL(fmt::format("canary read failed with: {}", std::current_exception()));
|
||||
}
|
||||
|
||||
uint64_t successful_reads = 0;
|
||||
uint64_t failed_reads = 0;
|
||||
|
||||
parallel_for_each(boost::irange(0, num_reads), [&] (int i) {
|
||||
return env.execute_prepared(read_id, {cql3::raw_value::make_value(serialized(tbl.pk)), cql3::raw_value::make_value(serialized(tbl.ck))}).then_wrapped(
|
||||
[&] (future<shared_ptr<cql_transport::messages::result_message>> fut) {
|
||||
if (fut.failed()) {
|
||||
// We expect failed, OOM-killed reads here.
|
||||
// No way to verify why they failed so we swallow all failures.
|
||||
fut.ignore_ready_future();
|
||||
++failed_reads;
|
||||
return;
|
||||
}
|
||||
assert_that(fut.get()).is_rows().with_rows_ignore_order({ {serialized(tbl.value)} });
|
||||
++successful_reads;
|
||||
});
|
||||
}).get();
|
||||
|
||||
testlog.info("total reads: {} ({} successful, {} failed)", num_reads, successful_reads, failed_reads);
|
||||
dump_stats(semaphore, log_level::info, false);
|
||||
|
||||
// There should be both successful and failed reads.
|
||||
// If there is only one or the other, the test is not testing anything.
|
||||
// We also check that the memory limiting mechanism of the semaphore was engaged.
|
||||
// The test is meaningless without it.
|
||||
// In the slow debug builds we never reach the kill limit for some reason.
|
||||
#ifndef DEBUG
|
||||
BOOST_REQUIRE_GE(failed_reads, 1);
|
||||
#endif
|
||||
BOOST_REQUIRE_GE(successful_reads, 1);
|
||||
|
||||
// Almost each failed read should have been in the memory queue at one point.
|
||||
BOOST_REQUIRE_GE(semaphore.get_stats().reads_enqueued_for_memory, semaphore.get_stats().total_reads_killed_due_to_kill_limit);
|
||||
|
||||
// All failures must be caused by the kill limit triggering.
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().total_reads_killed_due_to_kill_limit, failed_reads);
|
||||
|
||||
return make_ready_future<>();
|
||||
}, std::move(db_cfg_ptr));
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_request_memory_preserves_state) {
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
|
||||
const auto serialize_multiplier = 2;
|
||||
const auto kill_multiplier = std::numeric_limits<uint32_t>::max(); // we don't want this to interfere with our test
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
|
||||
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier));
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
auto sponge_permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0();
|
||||
|
||||
uint64_t reads_enqueued_for_memory = 0;
|
||||
|
||||
auto do_check = [&] (reader_permit& permit, uint64_t used, uint64_t blocked, std::source_location sl) {
|
||||
testlog.info("do_check() {}:{}", sl.file_name(), sl.line());
|
||||
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().current_permits, 2);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, used);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, blocked);
|
||||
|
||||
auto units1 = permit.request_memory(1024).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().current_permits, 2);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, used);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, blocked);
|
||||
|
||||
auto sponge_units = sponge_permit.request_memory(8 * 1024).get();
|
||||
|
||||
// sponge permit is now the blessed one
|
||||
|
||||
auto units2_fut = permit.request_memory(1024);
|
||||
BOOST_REQUIRE(!units2_fut.available());
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_memory, ++reads_enqueued_for_memory);
|
||||
|
||||
sponge_units.reset();
|
||||
auto units2 = units2_fut.get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().current_permits, 2);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, used);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, blocked);
|
||||
};
|
||||
|
||||
// unused
|
||||
{
|
||||
auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0();
|
||||
do_check(permit, 0, 0, std::source_location::current());
|
||||
}
|
||||
|
||||
// used
|
||||
{
|
||||
auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0();
|
||||
reader_permit::used_guard ug{permit};
|
||||
do_check(permit, 1, 0, std::source_location::current());
|
||||
}
|
||||
|
||||
// blocked
|
||||
{
|
||||
auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0();
|
||||
reader_permit::used_guard ug{permit};
|
||||
reader_permit::blocked_guard bg{permit};
|
||||
do_check(permit, 1, 1, std::source_location::current());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,6 +31,6 @@ custom_args:
|
||||
cql_query_test:
|
||||
- '-c2 -m2G --fail-on-abandoned-failed-futures=true'
|
||||
reader_concurrency_semaphore_test:
|
||||
- '-c1 -m1G'
|
||||
- '-c1 -m256M'
|
||||
run_in_debug:
|
||||
- logalloc_standard_allocator_segment_pool_backend_test
|
||||
|
||||
@@ -550,11 +550,8 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) {
|
||||
}).get0();
|
||||
|
||||
// consume all units except what is needed to admit a single reader.
|
||||
const auto consumed_resources = sem.initial_resources() - reader_resources{1, replica::new_reader_base_cost};
|
||||
sem.consume(consumed_resources);
|
||||
auto release_resources = defer([&sem, consumed_resources] {
|
||||
sem.signal(consumed_resources);
|
||||
});
|
||||
auto sponge_permit = sem.make_tracking_only_permit(s.get(), "sponge", db::no_timeout);
|
||||
auto resources = sponge_permit.consume_resources(sem.available_resources() - reader_resources{1, replica::new_reader_base_cost});
|
||||
|
||||
testlog.info("res = [.count={}, .memory={}]", sem.available_resources().count, sem.available_resources().memory);
|
||||
|
||||
|
||||
@@ -493,6 +493,12 @@ public:
|
||||
debug::the_database = nullptr;
|
||||
});
|
||||
auto cfg = cfg_in.db_config;
|
||||
if (!cfg->reader_concurrency_semaphore_serialize_limit_multiplier.is_set()) {
|
||||
cfg->reader_concurrency_semaphore_serialize_limit_multiplier.set(std::numeric_limits<uint32_t>::max());
|
||||
}
|
||||
if (!cfg->reader_concurrency_semaphore_kill_limit_multiplier.is_set()) {
|
||||
cfg->reader_concurrency_semaphore_kill_limit_multiplier.set(std::numeric_limits<uint32_t>::max());
|
||||
}
|
||||
tmpdir data_dir;
|
||||
auto data_dir_path = data_dir.path().string();
|
||||
if (!cfg->data_file_directories.is_set()) {
|
||||
|
||||
@@ -97,6 +97,9 @@ def make_scylla_conf(workdir: pathlib.Path, host_addr: str, seed_addrs: List[str
|
||||
|
||||
'permissions_update_interval_in_ms': 100,
|
||||
'permissions_validity_in_ms': 100,
|
||||
|
||||
'reader_concurrency_semaphore_serialize_limit_multiplier': 0,
|
||||
'reader_concurrency_semaphore_kill_limit_multiplier': 0,
|
||||
}
|
||||
|
||||
# Seastar options can not be passed through scylla.yaml, use command line
|
||||
|
||||
Reference in New Issue
Block a user