diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 5e8196129b..9b2206f0be 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -78,7 +78,7 @@ class reader_permit::impl : public boost::intrusive::list_base_hook; -static permit_stats do_dump_reader_permit_diagnostics(std::ostream& os, const permit_groups& permits, reader_permit::state state, bool sort_by_memory) { +static permit_stats do_dump_reader_permit_diagnostics(std::ostream& os, const permit_groups& permits, reader_permit::state state) { struct permit_summary { const schema* s; std::string_view op_name; @@ -299,25 +281,17 @@ static permit_stats do_dump_reader_permit_diagnostics(std::ostream& os, const pe } } - std::ranges::sort(permit_summaries, [sort_by_memory] (const permit_summary& a, const permit_summary& b) { - if (sort_by_memory) { - return a.memory < b.memory; - } else { - return a.count < b.count; - } + std::ranges::sort(permit_summaries, [] (const permit_summary& a, const permit_summary& b) { + return a.memory < b.memory; }); permit_stats total; - auto print_line = [&os, sort_by_memory] (auto col1, auto col2, auto col3) { - if (sort_by_memory) { - fmt::print(os, "{}\t{}\t{}\n", col2, col1, col3); - } else { - fmt::print(os, "{}\t{}\t{}\n", col1, col2, col3); - } + auto print_line = [&os] (auto col1, auto col2, auto col3) { + fmt::print(os, "{}\t{}\t{}\n", col2, col1, col3); }; - fmt::print(os, "Permits with state {}, sorted by {}\n", state, sort_by_memory ? "memory" : "count"); + fmt::print(os, "Permits with state {}\n", state); print_line("count", "memory", "name"); for (const auto& summary : permit_summaries) { total.count += summary.count; @@ -343,13 +317,11 @@ static void do_dump_reader_permit_diagnostics(std::ostream& os, const reader_con permit_stats total; fmt::print(os, "Semaphore {}: {}, dumping permit diagnostics:\n", semaphore.name(), problem); - total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::admitted, true); + total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::active); fmt::print(os, "\n"); - total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::inactive, false); + total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::inactive); fmt::print(os, "\n"); - total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::waiting, false); - fmt::print(os, "\n"); - total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::registered, false); + total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::waiting); fmt::print(os, "\n"); fmt::print(os, "Total: permits: {}, memory: {}\n", total.count, utils::to_hr_size(total.memory)); } @@ -385,7 +357,10 @@ void reader_concurrency_semaphore::inactive_read::detach() noexcept { } void reader_concurrency_semaphore::inactive_read_handle::abandon() noexcept { - delete std::exchange(_irp, nullptr); + if (_irp) { + _sem->close_reader(std::move(_irp->reader)); + delete std::exchange(_irp, nullptr); + } } void reader_concurrency_semaphore::signal(const resources& r) noexcept { @@ -429,11 +404,9 @@ reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore: auto& permit_impl = *reader.permit()._impl; // Implies _inactive_reads.empty(), we don't queue new readers before // evicting all inactive reads. - // FIXME: #4758, workaround for keeping tabs on un-admitted reads that are - // still registered as inactive. Without the below check, these can - // accumulate without limit. The real fix is #4758 -- that is to make all - // reads pass admission before getting started. - if (_wait_list.empty() && (permit_impl.get_state() == reader_permit::state::admitted || _resources >= permit_impl.resources())) { + // Checking the _wait_list covers the count resources only, so check memory + // separately. + if (_wait_list.empty() && _resources.memory > 0) { try { auto irp = std::make_unique(std::move(reader)); auto& ir = *irp; @@ -634,6 +607,12 @@ void reader_concurrency_semaphore::broken(std::exception_ptr ex) { } } +std::string reader_concurrency_semaphore::dump_diagnostics() const { + std::ostringstream os; + do_dump_reader_permit_diagnostics(os, *this, *_permit_list, "user request"); + return os.str(); +} + // A file that tracks the memory usage of buffers resulting from read // operations. class tracking_file_impl : public file_impl { diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index c41b7916da..c831966b82 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -310,4 +310,6 @@ public: } void broken(std::exception_ptr ex = {}); + + std::string dump_diagnostics() const; }; diff --git a/reader_permit.hh b/reader_permit.hh index 6974a3ca6c..ddc4235164 100644 --- a/reader_permit.hh +++ b/reader_permit.hh @@ -91,10 +91,9 @@ public: class resource_units; enum class state { - registered, // read is registered, but didn't attempt admission yet waiting, // waiting for admission - admitted, - inactive, // un-admitted reads that are registered as inactive + active, + inactive, }; class impl; diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index ec1846c280..0397e41042 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -971,6 +971,219 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_rele BOOST_REQUIRE(semaphore.available_resources() == initial_resources); } +SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_abandoned_handle_closes_reader) { + simple_schema s; + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name()); + auto stop_sem = deferred_stop(semaphore); + + auto permit = semaphore.make_permit(s.schema().get(), get_name()); + { + auto handle = semaphore.register_inactive_read(make_empty_flat_reader(s.schema(), permit)); + // The handle is destroyed here, triggering the destrution of the inactive read. + // If the test fails an assert() is triggered due to the reader being + // destroyed without having been closed before. + } +} + +// This unit test passes a read through admission again-and-again, just +// like an evictable reader would be during its lifetime. When readmitted +// the read sometimes has to wait and sometimes not. This is to check that +// the readmitting a previously admitted reader doesn't leak any units. +SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves_units) { + simple_schema s; + const auto initial_resources = reader_concurrency_semaphore::resources{10, 1024 * 1024}; + reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name()); + + auto permit = semaphore.make_permit(s.schema().get(), get_name()); + auto stop_sem = deferred_stop(semaphore); + + std::optional residue_units; + + for (int i = 0; i < 10; ++i) { + const auto have_residue_units = bool(residue_units); + + auto current_resources = initial_resources; + if (have_residue_units) { + current_resources -= residue_units->resources(); + } + BOOST_REQUIRE(semaphore.available_resources() == current_resources); + + std::optional admitted_units; + if (i % 2) { + const auto consumed_resources = semaphore.available_resources(); + semaphore.consume(consumed_resources); + + auto units_fut = permit.wait_admission(1024, db::no_timeout); + BOOST_REQUIRE(!units_fut.available()); + + semaphore.signal(consumed_resources); + admitted_units = units_fut.get(); + } else { + admitted_units = permit.wait_admission(1024, db::no_timeout).get(); + } + + current_resources -= admitted_units->resources(); + BOOST_REQUIRE(semaphore.available_resources() == current_resources); + + residue_units.emplace(permit.consume_resources(reader_resources(0, 100))); + if (!have_residue_units) { + current_resources -= residue_units->resources(); + } + BOOST_REQUIRE(semaphore.available_resources() == current_resources); + + auto handle = semaphore.register_inactive_read(make_empty_flat_reader(s.schema(), permit)); + BOOST_REQUIRE(semaphore.try_evict_one_inactive_read()); + } + + BOOST_REQUIRE(semaphore.available_resources() == initial_resources - residue_units->resources()); + + residue_units.reset(); + + BOOST_REQUIRE(semaphore.available_resources() == initial_resources); +} + +// This unit test checks that the semaphore doesn't get into a deadlock +// when contended, in the presence of many memory-only reads (that don't +// wait for admission). This is tested by simulating the 3 kind of reads we +// currently have in the system: +// * memory-only: reads that don't pass admission and only own memory. +// * admitted: reads that pass admission. +// * evictable: admitted reads that are furthermore evictable. +// +// The test creates and runs a large number of these reads in parallel, +// read kinds being selected randomly, then creates a watchdog which +// kills the test if no progress is being made. +SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_forward_progress) { + class reader { + class skeleton_reader : public flat_mutation_reader::impl { + reader_permit::resource_units _base_resources; + std::optional _resources; + public: + skeleton_reader(schema_ptr s, reader_permit permit, reader_permit::resource_units res) + : impl(std::move(s), std::move(permit)), _base_resources(std::move(res)) { } + virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override { + _resources.emplace(_permit.consume_resources(reader_resources(0, tests::random::get_int(1024, 2048)))); + return make_ready_future<>(); + } + virtual future<> next_partition() override { return make_ready_future<>(); } + virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { return make_ready_future<>(); } + virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point timeout) override { return make_ready_future<>(); } + virtual future<> close() noexcept override { + _resources.reset(); + return make_ready_future<>(); + } + }; + struct reader_visitor { + reader& r; + future<> operator()(std::monostate& ms) { return r.tick(ms); } + future<> operator()(flat_mutation_reader& reader) { return r.tick(reader); } + future<> operator()(reader_concurrency_semaphore::inactive_read_handle& handle) { return r.tick(handle); } + }; + + private: + schema_ptr _schema; + reader_permit _permit; + bool _memory_only = true; + bool _evictable = false; + std::optional _units; + std::variant _reader; + + private: + future<> make_reader() { + auto res = _permit.consume_memory(); + if (!_memory_only) { + res = co_await _permit.wait_admission(1024, db::no_timeout); + } + _reader = make_flat_mutation_reader(_schema, _permit, std::move(res)); + } + future<> tick(std::monostate&) { + co_await make_reader(); + co_await tick(std::get(_reader)); + } + future<> tick(flat_mutation_reader& reader) { + co_await reader.fill_buffer(db::no_timeout); + if (_evictable) { + _reader = _permit.semaphore().register_inactive_read(std::move(reader)); + } + } + future<> tick(reader_concurrency_semaphore::inactive_read_handle& handle) { + if (auto reader = _permit.semaphore().unregister_inactive_read(std::move(handle)); reader) { + _reader = std::move(*reader); + } else { + co_await make_reader(); + } + co_await tick(std::get(_reader)); + } + + public: + reader(schema_ptr s, reader_permit permit, bool memory_only, bool evictable) + : _schema(std::move(s)) + , _permit(std::move(permit)) + , _memory_only(memory_only) + , _evictable(evictable) + , _units(_permit.consume_memory(tests::random::get_int(128, 1024))) + { + } + future<> tick() { + return std::visit(reader_visitor{*this}, _reader); + } + future<> close() noexcept { + if (auto reader = std::get_if(&_reader)) { + return reader->close(); + } + return make_ready_future<>(); + } + }; + + const auto count = 10; + const auto num_readers = 512; + const auto ticks = 1000; + + simple_schema s; + reader_concurrency_semaphore semaphore(count, count * 1024, get_name()); + auto stop_sem = deferred_stop(semaphore); + + std::list> readers; + unsigned nr_memory_only = 0; + unsigned nr_admitted = 0; + unsigned nr_evictable = 0; + + for (auto i = 0; i < num_readers; ++i) { + const auto memory_only = tests::random::get_bool(); + const auto evictable = !memory_only && tests::random::get_bool(); + if (memory_only) { + ++nr_memory_only; + } else if (evictable) { + ++nr_evictable; + } else { + ++nr_admitted; + } + readers.emplace_back(reader(s.schema(), semaphore.make_permit(s.schema().get(), fmt::format("reader{}", i)), memory_only, evictable)); + } + + testlog.info("Created {} readers, memory_only={}, admitted={}, evictable={}", readers.size(), nr_memory_only, nr_admitted, nr_evictable); + + bool watchdog_touched = false; + auto watchdog = timer([&semaphore, &watchdog_touched] { + if (!watchdog_touched) { + testlog.error("Watchdog detected a deadlock, dumping diagnostics before killing the test: {}", semaphore.dump_diagnostics()); + semaphore.broken(std::make_exception_ptr(std::runtime_error("test killed by watchdog"))); + } + watchdog_touched = false; + }); + watchdog.arm_periodic(std::chrono::seconds(30)); + + parallel_for_each(readers, [&] (std::optional& r) -> future<> { + for (auto i = 0; i < ticks; ++i) { + watchdog_touched = true; + co_await r->tick(); + } + co_await r->close(); + r.reset(); + watchdog_touched = true; + }).get(); +} + static sstables::shared_sstable create_sstable(sstables::test_env& env, schema_ptr s, std::vector mutations) { static thread_local auto tmp = tmpdir(); diff --git a/test/boost/querier_cache_test.cc b/test/boost/querier_cache_test.cc index f849a6c34b..49ed93e5e8 100644 --- a/test/boost/querier_cache_test.cc +++ b/test/boost/querier_cache_test.cc @@ -714,7 +714,10 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) { nullptr, db::no_timeout).get(); - BOOST_CHECK_EQUAL(db.get_querier_cache_stats().resource_based_evictions, 1); + // The second read might be evicted too if it consumes more + // memory than the first and hence triggers memory control when + // saved in the querier cache. + BOOST_CHECK_GE(db.get_querier_cache_stats().resource_based_evictions, 1); // We want to read the entire partition so that the querier // is not saved at the end and thus ensure it is destroyed.