diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 208349b8ad..55c584fb1a 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -168,6 +168,7 @@ struct reader_concurrency_semaphore::permit_list { using list_type = boost::intrusive::list>; list_type permits; + permit_stats stats; }; reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, std::string_view op_name) @@ -409,7 +410,17 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring na std::move(name)) {} reader_concurrency_semaphore::~reader_concurrency_semaphore() { - assert(_stopped); + if (!_permit_list->stats.total_permits) { + // We allow destroy without stop() when the semaphore wasn't used at all yet. + return; + } + if (!_stopped) { + on_internal_error_noexcept(rcslog, format("~reader_concurrency_semaphore(): semaphore {} not stopped before destruction", _name)); + // With the below conditions, we can get away with the semaphore being + // unstopped. In this case don't force an abort. + assert(_inactive_reads.empty() && !_close_readers_gate.get_count() && !_permit_gate.get_count()); + broken(); + } } reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore::register_inactive_read(flat_mutation_reader reader) noexcept { @@ -605,6 +616,7 @@ future reader_concurrency_semaphore::do_wait_admi void reader_concurrency_semaphore::on_permit_created(reader_permit::impl& permit) { _permit_gate.enter(); _permit_list->permits.push_back(permit); + ++_permit_list->stats.total_permits; } void reader_concurrency_semaphore::on_permit_destroyed(reader_permit::impl& permit) noexcept { @@ -612,6 +624,10 @@ void reader_concurrency_semaphore::on_permit_destroyed(reader_permit::impl& perm _permit_gate.leave(); } +reader_concurrency_semaphore::permit_stats reader_concurrency_semaphore::get_permit_stats() const { + return _permit_list->stats; +} + reader_permit reader_concurrency_semaphore::make_permit(const schema* const schema, const char* const op_name) { return reader_permit(*this, schema, std::string_view(op_name)); } diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index b29681134e..510503a397 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -79,6 +79,10 @@ public: // Total number of reads rejected because the admission queue reached its max capacity uint64_t total_reads_shed_due_to_overload = 0; }; + struct permit_stats { + // Total number of permits created so far. + uint64_t total_permits = 0; + }; struct permit_list; @@ -282,6 +286,9 @@ public: return _stats; } + /// Return stats about the currently existing permits. + permit_stats get_permit_stats() const; + /// Make a permit /// /// The permit is associated with a schema, which is the schema of the table diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index a706a8983f..b10d004f6b 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -554,6 +554,12 @@ SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_dump_reader_diganostics) { } SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_waits_on_permits) { + BOOST_TEST_MESSAGE("unused"); + { + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name()); + // Checks for stop() should not be triggered. + } + BOOST_TEST_MESSAGE("0 permits"); { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());