diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 018aeef632..3beefa4c7f 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -340,6 +340,18 @@ void reader_concurrency_semaphore::expiry_handler::operator()(entry& e) noexcept } reader_concurrency_semaphore::inactive_read::~inactive_read() { + detach(); +} + +void reader_concurrency_semaphore::inactive_read::detach() noexcept { + if (handle) { + handle->_irp = nullptr; + handle = nullptr; + } +} + +void reader_concurrency_semaphore::inactive_read_handle::abandon() noexcept { + delete std::exchange(_irp, nullptr); } void reader_concurrency_semaphore::signal(const resources& r) noexcept { @@ -385,7 +397,7 @@ reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore: auto& ir = *irp; _inactive_reads.push_back(ir); ++_stats.inactive_reads; - return inactive_read_handle(*this, std::move(irp)); + return inactive_read_handle(*this, *irp.release()); } catch (...) { // It is okay to swallow the exception since // we're allowed to drop the reader upon registration @@ -427,8 +439,7 @@ flat_mutation_reader_opt reader_concurrency_semaphore::unregister_inactive_read( } --_stats.inactive_reads; - auto irp = std::move(irh._irp); - irp->unlink(); + std::unique_ptr irp(irh._irp); return std::move(irp->reader); } @@ -441,13 +452,11 @@ bool reader_concurrency_semaphore::try_evict_one_inactive_read(evict_reason reas } void reader_concurrency_semaphore::evict(inactive_read& ir, evict_reason reason) noexcept { - auto reader = std::move(ir.reader); - ir.unlink(); + ir.detach(); + std::unique_ptr irp(&ir); try { - if (auto notify_handler = std::move(ir.notify_handler)) { - notify_handler(reason); - // The notify_handler may destroy the inactive_read. - // Do not use it after this point! + if (ir.notify_handler) { + ir.notify_handler(reason); } } catch (...) { rcslog.error("[semaphore {}] evict(): notify handler failed for inactive read evicted due to {}: {}", _name, reason, std::current_exception()); diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 53da839acf..752b586e6e 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -81,6 +81,8 @@ public: struct permit_list; + class inactive_read_handle; + private: struct entry { promise pr; @@ -102,12 +104,13 @@ private: flat_mutation_reader reader; eviction_notify_handler notify_handler; timer ttl_timer; + inactive_read_handle* handle = nullptr; explicit inactive_read(flat_mutation_reader reader_) noexcept : reader(std::move(reader_)) { } - inactive_read(inactive_read&&) = default; ~inactive_read(); + void detach() noexcept; }; using inactive_reads_type = bi::list>; @@ -115,24 +118,41 @@ private: public: class inactive_read_handle { reader_concurrency_semaphore* _sem = nullptr; - std::unique_ptr _irp; + inactive_read* _irp = nullptr; friend class reader_concurrency_semaphore; - explicit inactive_read_handle(reader_concurrency_semaphore& sem, std::unique_ptr irp) noexcept - : _sem(&sem), _irp(std::move(irp)) { + private: + void abandon() noexcept; + + explicit inactive_read_handle(reader_concurrency_semaphore& sem, inactive_read& ir) noexcept + : _sem(&sem), _irp(&ir) { + _irp->handle = this; } public: inactive_read_handle() = default; inactive_read_handle(inactive_read_handle&& o) noexcept : _sem(std::exchange(o._sem, nullptr)) - , _irp(std::move(o._irp)) { + , _irp(std::exchange(o._irp, nullptr)) { + if (_irp) { + _irp->handle = this; + } } inactive_read_handle& operator=(inactive_read_handle&& o) noexcept { + if (this == &o) { + return *this; + } + abandon(); _sem = std::exchange(o._sem, nullptr); - _irp = std::move(o._irp); + _irp = std::exchange(o._irp, nullptr); + if (_irp) { + _irp->handle = this; + } return *this; } + ~inactive_read_handle() { + abandon(); + } explicit operator bool() const noexcept { return bool(_irp); }