From 581edc4e4eb52ceaa969c100e884ebbaeb424bf8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 12 Mar 2021 18:04:41 +0200 Subject: [PATCH] reader_concurrency_semaphore: make inactive_read_handle a weak reference Having the handle keep an owning reference to the inactive read lead to awkward situations, where the inactive read is destroyed during eviction in certain situations only (querier cache) and not in other cases. Although the users didn't notice anything from this, it lead to very brittle code inside the reader concurrency semaphore. Among others, the inactive read destructor has to be open coded in evict() which already lead to mistakes. This patch goes back to the weak pointer paradigm used a while ago, which is a much more natural fit for this. Inactive reads are still kept in an intrusive list in the semaphore but the handle now keeps a weak pointer to them. When destroyed the handler will destroy the inactive read if it is still alive. When evicting the inactive read, it will set the pointer in the handle to null. --- reader_concurrency_semaphore.cc | 27 ++++++++++++++++++--------- reader_concurrency_semaphore.hh | 32 ++++++++++++++++++++++++++------ 2 files changed, 44 insertions(+), 15 deletions(-) diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 018aeef632..3beefa4c7f 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -340,6 +340,18 @@ void reader_concurrency_semaphore::expiry_handler::operator()(entry& e) noexcept } reader_concurrency_semaphore::inactive_read::~inactive_read() { + detach(); +} + +void reader_concurrency_semaphore::inactive_read::detach() noexcept { + if (handle) { + handle->_irp = nullptr; + handle = nullptr; + } +} + +void reader_concurrency_semaphore::inactive_read_handle::abandon() noexcept { + delete std::exchange(_irp, nullptr); } void reader_concurrency_semaphore::signal(const resources& r) noexcept { @@ -385,7 +397,7 @@ reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore: auto& ir = *irp; _inactive_reads.push_back(ir); ++_stats.inactive_reads; - return inactive_read_handle(*this, std::move(irp)); + return inactive_read_handle(*this, *irp.release()); } catch (...) { // It is okay to swallow the exception since // we're allowed to drop the reader upon registration @@ -427,8 +439,7 @@ flat_mutation_reader_opt reader_concurrency_semaphore::unregister_inactive_read( } --_stats.inactive_reads; - auto irp = std::move(irh._irp); - irp->unlink(); + std::unique_ptr irp(irh._irp); return std::move(irp->reader); } @@ -441,13 +452,11 @@ bool reader_concurrency_semaphore::try_evict_one_inactive_read(evict_reason reas } void reader_concurrency_semaphore::evict(inactive_read& ir, evict_reason reason) noexcept { - auto reader = std::move(ir.reader); - ir.unlink(); + ir.detach(); + std::unique_ptr irp(&ir); try { - if (auto notify_handler = std::move(ir.notify_handler)) { - notify_handler(reason); - // The notify_handler may destroy the inactive_read. - // Do not use it after this point! + if (ir.notify_handler) { + ir.notify_handler(reason); } } catch (...) { rcslog.error("[semaphore {}] evict(): notify handler failed for inactive read evicted due to {}: {}", _name, reason, std::current_exception()); diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 53da839acf..752b586e6e 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -81,6 +81,8 @@ public: struct permit_list; + class inactive_read_handle; + private: struct entry { promise pr; @@ -102,12 +104,13 @@ private: flat_mutation_reader reader; eviction_notify_handler notify_handler; timer ttl_timer; + inactive_read_handle* handle = nullptr; explicit inactive_read(flat_mutation_reader reader_) noexcept : reader(std::move(reader_)) { } - inactive_read(inactive_read&&) = default; ~inactive_read(); + void detach() noexcept; }; using inactive_reads_type = bi::list>; @@ -115,24 +118,41 @@ private: public: class inactive_read_handle { reader_concurrency_semaphore* _sem = nullptr; - std::unique_ptr _irp; + inactive_read* _irp = nullptr; friend class reader_concurrency_semaphore; - explicit inactive_read_handle(reader_concurrency_semaphore& sem, std::unique_ptr irp) noexcept - : _sem(&sem), _irp(std::move(irp)) { + private: + void abandon() noexcept; + + explicit inactive_read_handle(reader_concurrency_semaphore& sem, inactive_read& ir) noexcept + : _sem(&sem), _irp(&ir) { + _irp->handle = this; } public: inactive_read_handle() = default; inactive_read_handle(inactive_read_handle&& o) noexcept : _sem(std::exchange(o._sem, nullptr)) - , _irp(std::move(o._irp)) { + , _irp(std::exchange(o._irp, nullptr)) { + if (_irp) { + _irp->handle = this; + } } inactive_read_handle& operator=(inactive_read_handle&& o) noexcept { + if (this == &o) { + return *this; + } + abandon(); _sem = std::exchange(o._sem, nullptr); - _irp = std::move(o._irp); + _irp = std::exchange(o._irp, nullptr); + if (_irp) { + _irp->handle = this; + } return *this; } + ~inactive_read_handle() { + abandon(); + } explicit operator bool() const noexcept { return bool(_irp); }