Merge "register_inactive_read: error handling" from Benny

"
Currently, register_inactive_read accepts an eviction_notify_handler
to be called when the inactive_read is evicted.

However, in case there was an error in register_inactive_read
the notification function isn't called leaving behind
state that needs to be cleaned up.

This series separates the register_inactive_reader interface
into 2 parts:

1. register_inactive_reader(flat_mutation_reader) - which just registers
the reader and return an inactive_read_handle, *if permitted*.
Otherwise, the notification handler is not called (it is not known yet)
and the caller is not expected to do anything fance at this point
that will require cleanup.

This optimizes the server when overloaded since we do less work
that we'd need to undo in case the reader_concurrecy_semaphore
runs out of resources.

2. After register_inactive_reader succeeded to return a valid
inactive_read_handle, the caller sets up its local state
and may call `set_notify_handler` to set the optional
notify_handler and ttl on the o_r_h.

After this state, the notify_handler will be called when
the inactive_reader is evicted, for any reason.

querier_cache::insert_querier was modified to use the
above procedure and to handle (and log/ignore) any error
in the process.

inactive_read_handle and inactive_read keeping track of each other
was simplified by keeping an iterator in the handle and a backpointer
in the inactive_read object.  The former is used to evict the reader
and to set the notify_handler and/or ttl without having to lookup the i_r.
The latter is used to invalidate the i_r_h when the i_r is destroyed.

Test: unit(release), querier_cache_test(debug)
"

* tag 'register_inactive_read-error-handling-v6' of github.com:bhalevy/scylla:
  querier_cache: insert_querier: ignore errors to register inactive reader
  querier_cache: insert_querier: handle errors
  querier_utils: mark functions noexcept
  reader_concurrency_semaphore: register_inactive_read: make noexcept
  reader_concurrency_semaphore: separate set_notify_handler from register_inactive_reader
  reader_concurrency_semaphore: inactive_read: make ttl_timer non-optional
  reader_concurrency_semaphore: inactive_read: use intrusive list
  reader_concurrency_semaphore: do_wait_admission: use try_evict_one_inactive_read
  reader_concurrency_semaphore: try_evict_one_inactive_read: pass evict_reason
  reader_concurrency_semaphore: unregister_inactive_read: calling on wrong semaphore is an internal error
  reader_concurrency_semaphore: unregister_inactive_read: do nothing if disengaged
  reader_concurrency_semaphore: inactive_read_handle: swap definition order
  reader_lifecycle_policy: retire low level try_resume method
  reader_concurrency_semaphore: inactive_read: keep a flat_mutation_reader
This commit is contained in:
Avi Kivity
2021-02-10 19:09:21 +02:00
7 changed files with 149 additions and 116 deletions

View File

@@ -459,7 +459,7 @@ future<> read_context::save_reader(shard_id shard, const dht::decorated_key& las
return _db.invoke_on(shard, [this, query_uuid = _cmd.query_uuid, query_ranges = _ranges, &rm,
&last_pkey, &last_ckey, gts = tracing::global_trace_state_ptr(_trace_state)] (database& db) mutable {
try {
flat_mutation_reader_opt reader = try_resume(rm.rparts->permit.semaphore(), std::move(*rm.handle));
flat_mutation_reader_opt reader = rm.rparts->permit.semaphore().unregister_inactive_read(std::move(*rm.handle));
if (!reader) {
return make_ready_future<>();

View File

@@ -1094,11 +1094,7 @@ void evictable_reader::maybe_pause(flat_mutation_reader reader) {
}
flat_mutation_reader_opt evictable_reader::try_resume() {
auto ir_ptr = _permit.semaphore().unregister_inactive_read(std::move(_irh));
if (!ir_ptr) {
return {};
}
return std::move(*ir_ptr);
return _permit.semaphore().unregister_inactive_read(std::move(_irh));
}
void evictable_reader::update_next_position(flat_mutation_reader& reader) {
@@ -1930,15 +1926,6 @@ reader_lifecycle_policy::pause(reader_concurrency_semaphore& sem, flat_mutation_
return sem.register_inactive_read(std::move(reader));
}
flat_mutation_reader_opt
reader_lifecycle_policy::try_resume(reader_concurrency_semaphore& sem, reader_concurrency_semaphore::inactive_read_handle irh) {
auto ir_ptr = sem.unregister_inactive_read(std::move(irh));
if (!ir_ptr) {
return {};
}
return std::move(*ir_ptr);
}
reader_concurrency_semaphore::inactive_read_handle
reader_lifecycle_policy::pause(flat_mutation_reader reader) {
return pause(semaphore(), std::move(reader));
@@ -1946,7 +1933,7 @@ reader_lifecycle_policy::pause(flat_mutation_reader reader) {
flat_mutation_reader_opt
reader_lifecycle_policy::try_resume(reader_concurrency_semaphore::inactive_read_handle irh) {
return try_resume(semaphore(), std::move(irh));
return semaphore().unregister_inactive_read(std::move(irh));
}
flat_mutation_reader make_multishard_combining_reader(

View File

@@ -444,7 +444,6 @@ protected:
// Helpers for implementations, who might wish to provide the semaphore in
// other ways than through the official `semaphore()` override.
static reader_concurrency_semaphore::inactive_read_handle pause(reader_concurrency_semaphore& sem, flat_mutation_reader reader);
static flat_mutation_reader_opt try_resume(reader_concurrency_semaphore& sem, reader_concurrency_semaphore::inactive_read_handle irh);
public:
/// Create an appropriate reader on the shard it is called on.

View File

@@ -22,11 +22,14 @@
#include "querier.hh"
#include "schema.hh"
#include "log.hh"
#include <boost/range/adaptor/map.hpp>
namespace query {
logging::logger qlogger("querier_cache");
enum class can_use {
yes,
no_schema_version_mismatch,
@@ -211,16 +214,16 @@ querier_cache::querier_cache(std::chrono::seconds entry_ttl)
}
struct querier_utils {
static flat_mutation_reader get_reader(querier_base& q) {
static flat_mutation_reader get_reader(querier_base& q) noexcept {
return std::move(std::get<flat_mutation_reader>(q._reader));
}
static reader_concurrency_semaphore::inactive_read_handle get_inactive_read_handle(querier_base& q) {
static reader_concurrency_semaphore::inactive_read_handle get_inactive_read_handle(querier_base& q) noexcept {
return std::move(std::get<reader_concurrency_semaphore::inactive_read_handle>(q._reader));
}
static void set_reader(querier_base& q, flat_mutation_reader r) {
static void set_reader(querier_base& q, flat_mutation_reader r) noexcept {
q._reader = std::move(r);
}
static void set_inactive_read_handle(querier_base& q, reader_concurrency_semaphore::inactive_read_handle h) {
static void set_inactive_read_handle(querier_base& q, reader_concurrency_semaphore::inactive_read_handle h) noexcept {
q._reader = std::move(h);
}
};
@@ -247,9 +250,22 @@ static void insert_querier(
auto& sem = q.permit().semaphore();
auto irh = sem.register_inactive_read(querier_utils::get_reader(q));
if (!irh) {
return;
}
try {
auto cleanup_irh = defer([&] {
sem.unregister_inactive_read(std::move(irh));
});
auto it = index.emplace(key, std::make_unique<Querier>(std::move(q)));
++stats.population;
auto cleanup_index = defer([&] {
index.erase(it);
--stats.population;
});
auto notify_handler = [&stats, &index, it] (reader_concurrency_semaphore::evict_reason reason) {
index.erase(it);
@@ -266,9 +282,17 @@ static void insert_querier(
--stats.population;
};
if (auto irh = sem.register_inactive_read(querier_utils::get_reader(*it->second), ttl, std::move(notify_handler))) {
querier_utils::set_inactive_read_handle(*it->second, std::move(irh));
}
sem.set_notify_handler(irh, std::move(notify_handler), ttl);
querier_utils::set_inactive_read_handle(*it->second, std::move(irh));
cleanup_index.cancel();
cleanup_irh.cancel();
} catch (...) {
// It is okay to swallow the exception since
// we're allowed to drop the reader upon registration
// due to lack of resources - in which case we already
// drop the querier.
qlogger.warn("Failed to insert querier into index: {}. Ignored as if it was evicted upon registration", std::current_exception());
}
}
void querier_cache::insert(utils::UUID key, data_querier&& q, tracing::trace_state_ptr trace_state) {
@@ -304,11 +328,11 @@ static std::optional<Querier> lookup_querier(
throw std::runtime_error("lookup_querier(): found querier is not of the expected type");
}
auto& q = *q_ptr;
auto read_ptr = q.permit().semaphore().unregister_inactive_read(querier_utils::get_inactive_read_handle(q));
if (!read_ptr) {
auto reader_opt = q.permit().semaphore().unregister_inactive_read(querier_utils::get_inactive_read_handle(q));
if (!reader_opt) {
throw std::runtime_error("lookup_querier(): found querier that is evicted");
}
querier_utils::set_reader(q, std::move(*read_ptr.get()));
querier_utils::set_reader(q, std::move(*reader_opt));
--stats.population;
const auto can_be_used = can_be_used_for_page(q, s, ranges.front(), slice);

View File

@@ -339,10 +339,6 @@ void reader_concurrency_semaphore::expiry_handler::operator()(entry& e) noexcept
maybe_dump_reader_permit_diagnostics(_semaphore, *_semaphore._permit_list, "timed out");
}
reader_concurrency_semaphore::inactive_read::inactive_read(flat_mutation_reader reader)
: reader(std::make_unique<flat_mutation_reader>(std::move(reader))) {
}
reader_concurrency_semaphore::inactive_read::~inactive_read() {
}
@@ -380,42 +376,47 @@ reader_concurrency_semaphore::~reader_concurrency_semaphore() {
broken(std::make_exception_ptr(broken_semaphore{}));
}
reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore::register_inactive_read(flat_mutation_reader reader,
eviction_notify_handler notify_handler) {
return register_inactive_read(std::move(reader), std::chrono::duration_values<std::chrono::seconds>::max(), std::move(notify_handler));
}
reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore::register_inactive_read(flat_mutation_reader reader,
std::chrono::seconds ttl, eviction_notify_handler notify_handler) {
reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore::register_inactive_read(flat_mutation_reader reader) noexcept {
// Implies _inactive_reads.empty(), we don't queue new readers before
// evicting all inactive reads.
if (_wait_list.empty()) {
inactive_read ir(std::move(reader));
ir.notify_handler = std::move(notify_handler);
const auto [it, _] = _inactive_reads.emplace(_next_id++, std::move(ir));
(void)_;
if (ttl != std::chrono::duration_values<std::chrono::seconds>::max()) {
it->second.ttl_timer.emplace([this, it = it] {
evict(it, evict_reason::time);
});
it->second.ttl_timer->arm(lowres_clock::now() + ttl);
}
try {
auto irp = std::make_unique<inactive_read>(std::move(reader));
auto& ir = *irp;
_inactive_reads.push_back(ir);
++_stats.inactive_reads;
return inactive_read_handle(*this, it->first);
return inactive_read_handle(*this, std::move(irp));
} catch (...) {
// It is okay to swallow the exception since
// we're allowed to drop the reader upon registration
// due to lack of resources. Returning an empty
// i_r_h here rather than throwing simplifies the caller's
// error handling.
rcslog.warn("Registering inactive read failed: {}. Ignored as if it was evicted.", std::current_exception());
}
} else {
++_stats.permit_based_evictions;
}
// The evicted reader will release its permit, hopefully allowing us to
// admit some readers from the _wait_list.
if (notify_handler) {
notify_handler(evict_reason::permit);
}
++_stats.permit_based_evictions;
return inactive_read_handle();
}
std::unique_ptr<flat_mutation_reader> reader_concurrency_semaphore::unregister_inactive_read(inactive_read_handle irh) {
if (irh && irh._sem != this) {
throw std::runtime_error(fmt::format(
void reader_concurrency_semaphore::set_notify_handler(inactive_read_handle& irh, eviction_notify_handler&& notify_handler, std::optional<std::chrono::seconds> ttl_opt) {
auto& ir = *irh._irp;
ir.notify_handler = std::move(notify_handler);
if (ttl_opt) {
ir.ttl_timer.set_callback([this, &ir] {
evict(ir, evict_reason::time);
});
ir.ttl_timer.arm(lowres_clock::now() + *ttl_opt);
}
}
flat_mutation_reader_opt reader_concurrency_semaphore::unregister_inactive_read(inactive_read_handle irh) {
if (!irh) {
return {};
}
if (irh._sem != this) {
on_internal_error(rcslog, fmt::format(
"reader_concurrency_semaphore::unregister_inactive_read(): "
"attempted to unregister an inactive read with a handle belonging to another semaphore: "
"this is {} (0x{:x}) but the handle belongs to {} (0x{:x})",
@@ -425,27 +426,27 @@ std::unique_ptr<flat_mutation_reader> reader_concurrency_semaphore::unregister_i
reinterpret_cast<uintptr_t>(irh._sem)));
}
if (auto it = _inactive_reads.find(irh._id); it != _inactive_reads.end()) {
auto ir = std::move(it->second);
_inactive_reads.erase(it);
--_stats.inactive_reads;
return std::move(ir.reader);
}
return {};
--_stats.inactive_reads;
auto irp = std::move(irh._irp);
irp->unlink();
return std::move(irp->reader);
}
bool reader_concurrency_semaphore::try_evict_one_inactive_read() {
bool reader_concurrency_semaphore::try_evict_one_inactive_read(evict_reason reason) {
if (_inactive_reads.empty()) {
return false;
}
evict(_inactive_reads.begin(), evict_reason::manual);
evict(_inactive_reads.front(), reason);
return true;
}
reader_concurrency_semaphore::inactive_reads_type::iterator reader_concurrency_semaphore::evict(inactive_reads_type::iterator it, evict_reason reason) {
auto ir = std::move(it->second);
if (ir.notify_handler) {
ir.notify_handler(reason);
void reader_concurrency_semaphore::evict(inactive_read& ir, evict_reason reason) {
auto reader = std::move(ir.reader);
ir.unlink();
if (auto notify_handler = std::move(ir.notify_handler)) {
notify_handler(reason);
// The notify_handler may destroy the inactive_read.
// Do not use it after this point!
}
switch (reason) {
case evict_reason::permit:
@@ -458,7 +459,6 @@ reader_concurrency_semaphore::inactive_reads_type::iterator reader_concurrency_s
break;
}
--_stats.inactive_reads;
return _inactive_reads.erase(it);
}
bool reader_concurrency_semaphore::has_available_units(const resources& r) const {
@@ -484,9 +484,10 @@ future<reader_permit::resource_units> reader_concurrency_semaphore::do_wait_admi
format("{}: restricted mutation reader queue overload", _name))));
}
auto r = resources(1, static_cast<ssize_t>(memory));
auto it = _inactive_reads.begin();
while (!may_proceed(r) && it != _inactive_reads.end()) {
it = evict(it, evict_reason::permit);
while (!may_proceed(r)) {
if (!try_evict_one_inactive_read(evict_reason::permit)) {
break;
}
}
if (may_proceed(r)) {
permit.on_admission();

View File

@@ -21,14 +21,15 @@
#pragma once
#include <map>
#include <boost/intrusive/list.hpp>
#include <seastar/core/future.hh>
#include "reader_permit.hh"
#include "flat_mutation_reader.hh"
namespace bi = boost::intrusive;
using namespace seastar;
class flat_mutation_reader;
/// Specific semaphore for controlling reader concurrency
///
/// Use `make_permit()` to create a permit to track the resource consumption
@@ -63,30 +64,6 @@ public:
using eviction_notify_handler = noncopyable_function<void(evict_reason)>;
class inactive_read_handle {
reader_concurrency_semaphore* _sem = nullptr;
uint64_t _id = 0;
friend class reader_concurrency_semaphore;
explicit inactive_read_handle(reader_concurrency_semaphore& sem, uint64_t id) noexcept
: _sem(&sem), _id(id) {
}
public:
inactive_read_handle() = default;
inactive_read_handle(inactive_read_handle&& o) noexcept
: _sem(std::exchange(o._sem, nullptr)), _id(std::exchange(o._id, 0)) {
}
inactive_read_handle& operator=(inactive_read_handle&& o) noexcept {
_sem = std::exchange(o._sem, nullptr);
_id = std::exchange(o._id, 0);
return *this;
}
explicit operator bool() const noexcept {
return bool(_id);
}
};
struct stats {
// The number of inactive reads evicted to free up permits.
uint64_t permit_based_evictions = 0;
@@ -121,17 +98,45 @@ private:
void operator()(entry& e) noexcept;
};
struct inactive_read {
std::unique_ptr<flat_mutation_reader> reader;
struct inactive_read : public bi::list_base_hook<bi::link_mode<bi::auto_unlink>> {
flat_mutation_reader reader;
eviction_notify_handler notify_handler;
std::optional<timer<lowres_clock>> ttl_timer;
timer<lowres_clock> ttl_timer;
explicit inactive_read(flat_mutation_reader);
explicit inactive_read(flat_mutation_reader reader_) noexcept
: reader(std::move(reader_))
{ }
inactive_read(inactive_read&&) = default;
~inactive_read();
};
using inactive_reads_type = std::map<uint64_t, inactive_read>;
using inactive_reads_type = bi::list<inactive_read, bi::constant_time_size<false>>;
public:
class inactive_read_handle {
reader_concurrency_semaphore* _sem = nullptr;
std::unique_ptr<inactive_read> _irp;
friend class reader_concurrency_semaphore;
explicit inactive_read_handle(reader_concurrency_semaphore& sem, std::unique_ptr<inactive_read> irp) noexcept
: _sem(&sem), _irp(std::move(irp)) {
}
public:
inactive_read_handle() = default;
inactive_read_handle(inactive_read_handle&& o) noexcept
: _sem(std::exchange(o._sem, nullptr))
, _irp(std::move(o._irp)) {
}
inactive_read_handle& operator=(inactive_read_handle&& o) noexcept {
_sem = std::exchange(o._sem, nullptr);
_irp = std::move(o._irp);
return *this;
}
explicit operator bool() const noexcept {
return bool(_irp);
}
};
private:
const resources _initial_resources;
@@ -142,13 +147,12 @@ private:
sstring _name;
size_t _max_queue_length = std::numeric_limits<size_t>::max();
std::function<void()> _prethrow_action;
uint64_t _next_id = 1;
inactive_reads_type _inactive_reads;
stats _stats;
std::unique_ptr<permit_list> _permit_list;
private:
inactive_reads_type::iterator evict(inactive_reads_type::iterator it, evict_reason reason);
void evict(inactive_read&, evict_reason reason);
bool has_available_units(const resources& r) const;
@@ -190,26 +194,40 @@ public:
/// The semaphore will evict this read when there is a shortage of
/// permits. This might be immediate, during this register call.
/// Clients can use the returned handle to unregister the read, when it
/// stops being inactive and hence evictable.
/// stops being inactive and hence evictable, or to set the optional
/// notify_handler and ttl.
///
/// An inactive read is an object implementing the `inactive_read`
/// interface.
/// The semaphore takes ownership of the created object and destroys it if
/// it is evicted.
inactive_read_handle register_inactive_read(flat_mutation_reader ir, eviction_notify_handler handler = {});
inactive_read_handle register_inactive_read(flat_mutation_reader ir, std::chrono::seconds ttl, eviction_notify_handler handler = {});
inactive_read_handle register_inactive_read(flat_mutation_reader ir) noexcept;
/// Set the inactive read eviction notification handler and optionally eviction ttl.
///
/// The semaphore may evict this read when there is a shortage of
/// permits or after the given ttl expired.
///
/// The notification handler will be called when the inactive_read is evicted
/// passing with the reason it was evicted to the handler.
///
/// Note that the inactive_read might have already been evicted if
/// the caller may yield after the register_inactive_read returned the handle
/// and before calling set_notify_handler. In this case, the caller must revalidate
/// the inactive_read_handle before calling this function.
void set_notify_handler(inactive_read_handle& irh, eviction_notify_handler&& handler, std::optional<std::chrono::seconds> ttl);
/// Unregister the previously registered inactive read.
///
/// If the read was not evicted, the inactive read object, passed in to the
/// register call, will be returned. Otherwise a nullptr is returned.
std::unique_ptr<flat_mutation_reader> unregister_inactive_read(inactive_read_handle irh);
flat_mutation_reader_opt unregister_inactive_read(inactive_read_handle irh);
/// Try to evict an inactive read.
///
/// Return true if an inactive read was evicted and false otherwise
/// (if there was no reader to evict).
bool try_evict_one_inactive_read();
bool try_evict_one_inactive_read(evict_reason = evict_reason::manual);
void clear_inactive_reads() {
_inactive_reads.clear();

View File

@@ -724,7 +724,7 @@ SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) {
t.assert_cache_lookup_mutation_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice)
.misses()
.no_drops()
.resource_based_evictions();
.no_evictions();
resources.reset();
@@ -746,6 +746,10 @@ SEASTAR_THREAD_TEST_CASE(test_unique_inactive_read_handle) {
// Sanity check that lookup still works with empty handle.
BOOST_REQUIRE(!sem1.unregister_inactive_read(reader_concurrency_semaphore::inactive_read_handle{}));
set_abort_on_internal_error(false);
auto reset_on_internal_abort = defer([] {
set_abort_on_internal_error(true);
});
BOOST_REQUIRE_THROW(sem1.unregister_inactive_read(std::move(sem2_h1)), std::runtime_error);
BOOST_REQUIRE_THROW(sem2.unregister_inactive_read(std::move(sem1_h1)), std::runtime_error);
}