Files
scylladb/reader_concurrency_semaphore.hh
Juliusz Stasiewicz d043393f52 db+semaphores+tests: mandatory `name' param in reader_concurrency_semaphore
Exception messages contain semaphore's name (provided in ctor).
This affects the queue overflow exception as well as timeout
exception. Also, custom throwing function in ctor was changed
to `prethrow_action', i.e. metrics can still be updated there but
now callers have no control over the type of the exception being
thrown. This affected `restricted_reader_max_queue_length' test.
`reader_concurrency_semaphore'-s docs are updated accordingly.
2019-12-03 15:41:34 +01:00

293 lines
9.5 KiB
C++

/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
/*
* Copyright (C) 2017 ScyllaDB
*/
#pragma once
#include <map>
#include <seastar/core/file.hh>
#include <seastar/core/future.hh>
#include "db/timeout_clock.hh"
#include "seastarx.hh"
/// Specific semaphore for controlling reader concurrency
///
/// Before creating a reader one should obtain a permit by calling
/// `wait_admission()`. This permit can then be used for tracking the
/// reader's memory consumption via `reader_resource_tracker`.
/// The permit should be held onto for the lifetime of the reader
/// and/or any buffer its tracking.
/// Reader concurrency is dual limited by count and memory.
/// The semaphore can be configured with the desired limits on
/// construction. New readers will only be admitted when there is both
/// enough count and memory units available. Readers are admitted in
/// FIFO order.
/// Semaphore's `name` must be provided in ctor and its only purpose is
/// to increase readability of exceptions: both timeout exceptions and
/// queue overflow exceptions (read below) include this `name` in messages.
/// It's also possible to specify the maximum allowed number of waiting
/// readers by the `max_queue_length` constructor parameter. When the
/// number of waiting readers becomes equal or greater than
/// `max_queue_length` (upon calling `wait_admission()`) an exception of
/// type `std::runtime_error` is thrown. Optionally, some additional
/// code can be executed just before throwing (`prethrow_action`
/// constructor parameter).
class reader_concurrency_semaphore {
public:
struct resources {
int count = 0;
ssize_t memory = 0;
resources() = default;
resources(int count, ssize_t memory)
: count(count)
, memory(memory) {
}
bool operator>=(const resources& other) const {
return count >= other.count && memory >= other.memory;
}
resources& operator-=(const resources& other) {
count -= other.count;
memory -= other.memory;
return *this;
}
resources& operator+=(const resources& other) {
count += other.count;
memory += other.memory;
return *this;
}
explicit operator bool() const {
return count >= 0 && memory >= 0;
}
};
class reader_permit {
reader_concurrency_semaphore& _semaphore;
const resources _base_cost;
public:
reader_permit(reader_concurrency_semaphore& semaphore, resources base_cost)
: _semaphore(semaphore)
, _base_cost(base_cost) {
}
~reader_permit() {
_semaphore.signal(_base_cost);
}
reader_permit(const reader_permit&) = delete;
reader_permit& operator=(const reader_permit&) = delete;
reader_permit(reader_permit&& other) = delete;
reader_permit& operator=(reader_permit&& other) = delete;
void consume_memory(size_t memory) {
_semaphore.consume_memory(memory);
}
void signal_memory(size_t memory) {
_semaphore.signal_memory(memory);
}
};
class inactive_read {
public:
virtual void evict() = 0;
virtual ~inactive_read() = default;
};
class inactive_read_handle {
uint64_t _id = 0;
friend class reader_concurrency_semaphore;
explicit inactive_read_handle(uint64_t id)
: _id(id) {
}
public:
inactive_read_handle() = default;
inactive_read_handle(inactive_read_handle&& o) : _id(std::exchange(o._id, 0)) {
}
inactive_read_handle& operator=(inactive_read_handle&& o) {
_id = std::exchange(o._id, 0);
return *this;
}
explicit operator bool() const {
return bool(_id);
}
};
struct inactive_read_stats {
// The number of inactive reads evicted to free up permits.
uint64_t permit_based_evictions = 0;
// The number of inactive reads currently registered.
uint64_t population = 0;
};
private:
struct entry {
promise<lw_shared_ptr<reader_permit>> pr;
resources res;
entry(promise<lw_shared_ptr<reader_permit>>&& pr, resources r) : pr(std::move(pr)), res(r) {}
};
class expiry_handler {
sstring _semaphore_name;
public:
explicit expiry_handler(sstring semaphore_name)
: _semaphore_name(std::move(semaphore_name)) {}
void operator()(entry& e) noexcept {
e.pr.set_exception(named_semaphore_timed_out(_semaphore_name));
}
};
private:
resources _resources;
expiring_fifo<entry, expiry_handler, db::timeout_clock> _wait_list;
sstring _name;
size_t _max_queue_length = std::numeric_limits<size_t>::max();
std::function<void()> _prethrow_action;
uint64_t _next_id = 1;
std::map<uint64_t, std::unique_ptr<inactive_read>> _inactive_reads;
inactive_read_stats _inactive_read_stats;
private:
bool has_available_units(const resources& r) const {
return bool(_resources) && _resources >= r;
}
bool may_proceed(const resources& r) const {
return has_available_units(r) && _wait_list.empty();
}
void consume_memory(size_t memory) {
_resources.memory -= memory;
}
void signal(const resources& r);
void signal_memory(size_t memory) {
signal(resources(0, static_cast<ssize_t>(memory)));
}
public:
struct no_limits { };
reader_concurrency_semaphore(int count,
ssize_t memory,
sstring name,
size_t max_queue_length = std::numeric_limits<size_t>::max(),
std::function<void()> prethrow_action = nullptr)
: _resources(count, memory)
, _wait_list(expiry_handler(name))
, _name(std::move(name))
, _max_queue_length(max_queue_length)
, _prethrow_action(std::move(prethrow_action)) {}
/// Create a semaphore with practically unlimited count and memory.
///
/// And conversely, no queue limit either.
explicit reader_concurrency_semaphore(no_limits)
: reader_concurrency_semaphore(
std::numeric_limits<int>::max(),
std::numeric_limits<ssize_t>::max(),
"unlimited reader_concurrency_semaphore") {}
reader_concurrency_semaphore(const reader_concurrency_semaphore&) = delete;
reader_concurrency_semaphore& operator=(const reader_concurrency_semaphore&) = delete;
reader_concurrency_semaphore(reader_concurrency_semaphore&&) = delete;
reader_concurrency_semaphore& operator=(reader_concurrency_semaphore&&) = delete;
/// Register an inactive read.
///
/// The semaphore will evict this read when there is a shortage of
/// permits. This might be immediate, during this register call.
/// Clients can use the returned handle to unregister the read, when it
/// stops being inactive and hence evictable.
///
/// An inactive read is an object implementing the `inactive_read`
/// interface.
/// The semaphore takes ownership of the created object and destroys it if
/// it is evicted.
inactive_read_handle register_inactive_read(std::unique_ptr<inactive_read> ir);
/// Unregister the previously registered inactive read.
///
/// If the read was not evicted, the inactive read object, passed in to the
/// register call, will be returned. Otherwise a nullptr is returned.
std::unique_ptr<inactive_read> unregister_inactive_read(inactive_read_handle irh);
/// Try to evict an inactive read.
///
/// Return true if an inactive read was evicted and false otherwise
/// (if there was no reader to evict).
bool try_evict_one_inactive_read();
void clear_inactive_reads() {
_inactive_reads.clear();
}
const inactive_read_stats& get_inactive_read_stats() const {
return _inactive_read_stats;
}
future<lw_shared_ptr<reader_permit>> wait_admission(size_t memory, db::timeout_clock::time_point timeout = db::no_timeout);
/// Consume the specific amount of resources without waiting.
lw_shared_ptr<reader_permit> consume_resources(resources r);
const resources available_resources() const {
return _resources;
}
size_t waiters() const {
return _wait_list.size();
}
};
class reader_resource_tracker {
lw_shared_ptr<reader_concurrency_semaphore::reader_permit> _permit;
public:
reader_resource_tracker() = default;
explicit reader_resource_tracker(lw_shared_ptr<reader_concurrency_semaphore::reader_permit> permit)
: _permit(std::move(permit)) {
}
bool operator==(const reader_resource_tracker& other) const {
return _permit == other._permit;
}
file track(file f) const;
lw_shared_ptr<reader_concurrency_semaphore::reader_permit> get_permit() const {
return _permit;
}
};
inline reader_resource_tracker no_resource_tracking() {
return {};
}