db+semaphores+tests: mandatory `name' param in reader_concurrency_semaphore

Exception messages contain semaphore's name (provided in ctor).
This affects the queue overflow exception as well as timeout
exception. Also, custom throwing function in ctor was changed
to `prethrow_action', i.e. metrics can still be updated there but
now callers have no control over the type of the exception being
thrown. This affected `restricted_reader_max_queue_length' test.
`reader_concurrency_semaphore'-s docs are updated accordingly.
This commit is contained in:
Juliusz Stasiewicz
2019-11-28 13:58:17 +01:00
parent fa12394dfe
commit d043393f52
4 changed files with 49 additions and 36 deletions

View File

@@ -199,19 +199,21 @@ database::database(const db::config& cfg, database_config dbcfg)
}))
, _read_concurrency_sem(max_count_concurrent_reads,
max_memory_concurrent_reads(),
"_read_concurrency_sem",
max_inactive_queue_length(),
[this] {
++_stats->sstable_read_queue_overloaded;
return std::make_exception_ptr(std::runtime_error("sstable read queue overloaded"));
})
// No timeouts or queue length limits - a failure here can kill an entire repair.
// Trust the caller to limit concurrency.
, _streaming_concurrency_sem(
max_count_streaming_concurrent_reads,
max_memory_streaming_concurrent_reads())
max_memory_streaming_concurrent_reads(),
"_streaming_concurrency_sem")
, _system_read_concurrency_sem(
max_count_system_concurrent_reads,
max_memory_system_concurrent_reads())
max_memory_system_concurrent_reads(),
"_system_read_concurrency_sem")
, _data_query_stage("data_query", &column_family::query)
, _mutation_query_stage()
, _apply_stage("db_apply", &database::do_apply)

View File

@@ -20,6 +20,7 @@
*/
#include <seastar/core/reactor.hh>
#include <seastar/core/print.hh>
#include "reader_concurrency_semaphore.hh"
@@ -77,7 +78,12 @@ bool reader_concurrency_semaphore::try_evict_one_inactive_read() {
future<lw_shared_ptr<reader_concurrency_semaphore::reader_permit>> reader_concurrency_semaphore::wait_admission(size_t memory,
db::timeout_clock::time_point timeout) {
if (_wait_list.size() >= _max_queue_length) {
return make_exception_future<lw_shared_ptr<reader_permit>>(_make_queue_overloaded_exception());
if (_prethrow_action) {
_prethrow_action();
}
return make_exception_future<lw_shared_ptr<reader_permit>>(
std::make_exception_ptr(std::runtime_error(
format("{}: restricted mutation reader queue overload", _name))));
}
auto r = resources(1, static_cast<ssize_t>(memory));
auto it = _inactive_reads.begin();

View File

@@ -39,16 +39,16 @@
/// construction. New readers will only be admitted when there is both
/// enough count and memory units available. Readers are admitted in
/// FIFO order.
/// It's possible to specify the maximum allowed number of waiting
/// Semaphore's `name` must be provided in ctor and its only purpose is
/// to increase readability of exceptions: both timeout exceptions and
/// queue overflow exceptions (read below) include this `name` in messages.
/// It's also possible to specify the maximum allowed number of waiting
/// readers by the `max_queue_length` constructor parameter. When the
/// number waiting readers would be equal or greater than this number
/// (when calling `wait_admission()`) an exception will be thrown.
/// The type of the exception and optionally some additional code
/// that should be executed when this happens can be customized by the
/// `raise_queue_overloaded_exception` constructor parameter. This
/// function will be called every time the queue limit is surpassed.
/// It is expected to return an `std::exception_ptr` that will be
/// injected into the future.
/// number of waiting readers becomes equal or greater than
/// `max_queue_length` (upon calling `wait_admission()`) an exception of
/// type `std::runtime_error` is thrown. Optionally, some additional
/// code can be executed just before throwing (`prethrow_action`
/// constructor parameter).
class reader_concurrency_semaphore {
public:
struct resources {
@@ -151,9 +151,14 @@ private:
resources res;
entry(promise<lw_shared_ptr<reader_permit>>&& pr, resources r) : pr(std::move(pr)), res(r) {}
};
struct expiry_handler {
class expiry_handler {
sstring _semaphore_name;
public:
explicit expiry_handler(sstring semaphore_name)
: _semaphore_name(std::move(semaphore_name)) {}
void operator()(entry& e) noexcept {
e.pr.set_exception(semaphore_timed_out());
e.pr.set_exception(named_semaphore_timed_out(_semaphore_name));
}
};
@@ -162,17 +167,14 @@ private:
expiring_fifo<entry, expiry_handler, db::timeout_clock> _wait_list;
sstring _name;
size_t _max_queue_length = std::numeric_limits<size_t>::max();
std::function<std::exception_ptr()> _make_queue_overloaded_exception;
std::function<void()> _prethrow_action;
uint64_t _next_id = 1;
std::map<uint64_t, std::unique_ptr<inactive_read>> _inactive_reads;
inactive_read_stats _inactive_read_stats;
private:
static std::exception_ptr default_make_queue_overloaded_exception() {
return std::make_exception_ptr(std::runtime_error("restricted mutation reader queue overload"));
}
bool has_available_units(const resources& r) const {
return bool(_resources) && _resources >= r;
}
@@ -195,19 +197,23 @@ public:
reader_concurrency_semaphore(int count,
ssize_t memory,
sstring name,
size_t max_queue_length = std::numeric_limits<size_t>::max(),
std::function<std::exception_ptr()> raise_queue_overloaded_exception = default_make_queue_overloaded_exception)
std::function<void()> prethrow_action = nullptr)
: _resources(count, memory)
, _wait_list(expiry_handler(name))
, _name(std::move(name))
, _max_queue_length(max_queue_length)
, _make_queue_overloaded_exception(raise_queue_overloaded_exception) {
}
, _prethrow_action(std::move(prethrow_action)) {}
/// Create a semaphore with practically unlimited count and memory.
///
/// And conversely, no queue limit either.
explicit reader_concurrency_semaphore(no_limits)
: reader_concurrency_semaphore(std::numeric_limits<int>::max(), std::numeric_limits<ssize_t>::max()) {
}
: reader_concurrency_semaphore(
std::numeric_limits<int>::max(),
std::numeric_limits<ssize_t>::max(),
"unlimited reader_concurrency_semaphore") {}
reader_concurrency_semaphore(const reader_concurrency_semaphore&) = delete;
reader_concurrency_semaphore& operator=(const reader_concurrency_semaphore&) = delete;

View File

@@ -1079,7 +1079,7 @@ class dummy_file_impl : public file_impl {
SEASTAR_TEST_CASE(reader_restriction_file_tracking) {
return async([&] {
reader_concurrency_semaphore semaphore(100, 4 * 1024);
reader_concurrency_semaphore semaphore(100, 4 * 1024, get_name());
// Testing the tracker here, no need to have a base cost.
auto permit = semaphore.wait_admission(0).get0();
@@ -1139,7 +1139,7 @@ SEASTAR_TEST_CASE(reader_restriction_file_tracking) {
SEASTAR_TEST_CASE(restricted_reader_reading) {
return sstables::test_env::do_with_async([&] (sstables::test_env& env) {
storage_service_for_tests ssft;
reader_concurrency_semaphore semaphore(2, new_reader_base_cost);
reader_concurrency_semaphore semaphore(2, new_reader_base_cost, get_name());
{
simple_schema s;
@@ -1210,7 +1210,7 @@ SEASTAR_TEST_CASE(restricted_reader_reading) {
SEASTAR_TEST_CASE(restricted_reader_timeout) {
return sstables::test_env::do_with_async([&] (sstables::test_env& env) {
storage_service_for_tests ssft;
reader_concurrency_semaphore semaphore(2, new_reader_base_cost);
reader_concurrency_semaphore semaphore(2, new_reader_base_cost, get_name());
{
simple_schema s;
@@ -1256,9 +1256,7 @@ SEASTAR_TEST_CASE(restricted_reader_max_queue_length) {
return sstables::test_env::do_with_async([&] (sstables::test_env& env) {
storage_service_for_tests ssft;
struct queue_overloaded_exception {};
reader_concurrency_semaphore semaphore(2, new_reader_base_cost, 2, [] { return std::make_exception_ptr(queue_overloaded_exception()); });
reader_concurrency_semaphore semaphore(2, new_reader_base_cost, get_name(), 2);
{
simple_schema s;
@@ -1279,7 +1277,7 @@ SEASTAR_TEST_CASE(restricted_reader_max_queue_length) {
BOOST_REQUIRE_EQUAL(semaphore.waiters(), 2);
// The queue should now be full.
BOOST_REQUIRE_THROW(reader4().get(), queue_overloaded_exception);
BOOST_REQUIRE_THROW(reader4().get(), std::runtime_error);
reader1_ptr.reset();
read2_fut.get();
@@ -1294,7 +1292,7 @@ SEASTAR_TEST_CASE(restricted_reader_max_queue_length) {
SEASTAR_TEST_CASE(restricted_reader_create_reader) {
return sstables::test_env::do_with_async([&] (sstables::test_env& env) {
storage_service_for_tests ssft;
reader_concurrency_semaphore semaphore(100, new_reader_base_cost);
reader_concurrency_semaphore semaphore(100, new_reader_base_cost, get_name());
{
simple_schema s;
@@ -1327,8 +1325,8 @@ SEASTAR_TEST_CASE(restricted_reader_create_reader) {
}
SEASTAR_TEST_CASE(test_restricted_reader_as_mutation_source) {
return seastar::async([] {
reader_concurrency_semaphore semaphore(100, 10 * new_reader_base_cost);
return seastar::async([test_name = get_name()] {
reader_concurrency_semaphore semaphore(100, 10 * new_reader_base_cost, test_name);
auto make_restricted_populator = [&semaphore](schema_ptr s, const std::vector<mutation> &muts) {
auto mt = make_lw_shared<memtable>(s);
@@ -2067,7 +2065,8 @@ public:
const auto shard = engine().cpu_id();
if (!_contexts[shard].semaphore) {
if (_evict_paused_readers) {
_contexts[shard].semaphore = make_foreign(std::make_unique<reader_concurrency_semaphore>(0, std::numeric_limits<ssize_t>::max()));
_contexts[shard].semaphore = make_foreign(std::make_unique<reader_concurrency_semaphore>(0, std::numeric_limits<ssize_t>::max(),
format("reader_concurrency_semaphore @shard_id={}", shard)));
// Add a waiter, so that all registered inactive reads are
// immediately evicted.
// We don't care about the returned future.