From 15a157611a43ad7268c7d54605a8bb03c62a72dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 17 Mar 2021 17:01:08 +0200 Subject: [PATCH 1/6] reader_concurrency_semaphore: make admission conditions consistent Currently there are two places where we check admission conditions: `do_wait_admission()` and `signal()`. Both use `has_available_units()` to check resource availability, but the former has some additional resource related conditions on top (in `may_proceed()`), which lead to the two paths working with slightly different conditions. To fix, push down all resource availability related checks to `has_available_units()` to ensure admission conditions are consistent across all paths. (cherry picked from commit d90cd6402c74004527555a89243231c6e5c11331) --- reader_concurrency_semaphore.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 6d0c4ae714..398bc71509 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -425,13 +425,13 @@ bool reader_concurrency_semaphore::try_evict_one_inactive_read() { } bool reader_concurrency_semaphore::has_available_units(const resources& r) const { - return bool(_resources) && _resources >= r; + // Special case: when there is no active reader (based on count) admit one + // regardless of availability of memory. + return (bool(_resources) && _resources >= r) || _resources.count == _initial_resources.count; } bool reader_concurrency_semaphore::may_proceed(const resources& r) const { - // Special case: when there is no active reader (based on count) admit one - // regardless of availability of memory. - return _wait_list.empty() && (has_available_units(r) || _resources.count == _initial_resources.count); + return _wait_list.empty() && has_available_units(r); } future reader_concurrency_semaphore::do_wait_admission(reader_permit permit, size_t memory, From f23052ae6498402632ab7683c5736cf5ab803022 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 29 Apr 2021 11:44:58 +0300 Subject: [PATCH 2/6] test: multishard_mutation_query_test: fuzzy-test: don't consume resource up-front MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The fuzzy test consumes a large chunk of resource from the semaphore up-front to simulate a contested semaphore. This isn't an accurate simulation, because no permit will have more than 1 units in reality. Furthermore this can even cause a deadlock since 8aaa3a7 as now we rely on all count units being available to make forward progress when memory is scarce. This patch just cuts out this part of the test, we now have a dedicated unit test for checking a heavily contested semaphore, that does it properly, so no need to try to fix this clumsy attempt that is just making trouble at this point. Refs: #8493 Tests: release(multishard_mutation_query_test:fuzzy_test) Signed-off-by: Botond Dénes Message-Id: <20210429084458.40406-1-bdenes@scylladb.com> (cherry picked from commit 26ae9555d1370d4fa30a7366b0b160c5cf656069) --- test/boost/multishard_mutation_query_test.cc | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/test/boost/multishard_mutation_query_test.cc b/test/boost/multishard_mutation_query_test.cc index 9f85652aec..c10e0acacb 100644 --- a/test/boost/multishard_mutation_query_test.cc +++ b/test/boost/multishard_mutation_query_test.cc @@ -977,14 +977,7 @@ SEASTAR_THREAD_TEST_CASE(fuzzy_test) { const auto& partitions = pop_desc.partitions; smp::invoke_on_all([cfg, db = &env.db(), gs = global_schema_ptr(pop_desc.schema), &partitions] { - auto s = gs.get(); - auto& sem = db->local().get_reader_concurrency_semaphore(); - - auto resources = sem.available_resources(); - resources -= reader_concurrency_semaphore::resources{1, 0}; - auto permit = sem.make_permit(s.get(), "fuzzy-test"); - - return run_fuzzy_test_workload(cfg, *db, std::move(s), partitions).finally([units = permit.consume_resources(resources)] {}); + return run_fuzzy_test_workload(cfg, *db, gs.get(), partitions); }).handle_exception([seed] (std::exception_ptr e) { testlog.error("Test workload failed with exception {}." " To repeat this particular run, replace the random seed of the test, with that of this run ({})." From 1c0557c6382fc4e82c405c2b53fab299ff0b2f14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 9 Apr 2021 15:44:09 +0300 Subject: [PATCH 3/6] reader_permit: always forward resources This commit conceptually reverts 4c8ab10. Said commit was meant to prevent the scenario where memory-only permits -- those that don't pass admission but still consume memory -- completely prevent the admission of reads, possibly even causing a deadlock because a permit might even blocks its own admission. The protection introduced by said commit however proved to be very problematic. It made the status of resources on the permit very hard to reason about and created loopholes via which permits could accumulate without tracking or they could even leak resources. Instead of continuing to patch this broken system, this commit does away with this "protection" based on the observation that deadlocks are now prevented anyway by the admission criteria introduced by 0fe75571d9, which admits a read anyway when all the initial count resources are available (meaning no admitted reader is alive), regardless of availability of memory. The benefits of this revert is that the semaphore now knows about all the resources and is able to do its job better as it is not "lied to" about resource by the permits. Furthermore the status of a permit's resources is much simpler to reason about, there are no more loopholes in unexpected state transitions to swallow/leak resources. To prove that this revert is indeed safe, in the next commit we add robust tests that stress test admission on a highly contested semaphore. This patch also does away with the registered/admitted differentiation of permits, as this doesn't make much sense anymore, instead these two are unified into a single "active" state. One can always tell whether a permit was admitted or not from whether it owns count resources anyway. (cherry picked from commit caaa8ef59ae4461f607d4eb6d0d78b4ad11276f1) --- reader_concurrency_semaphore.cc | 48 ++++++++++---------------------- reader_permit.hh | 3 +- test/boost/querier_cache_test.cc | 5 +++- 3 files changed, 20 insertions(+), 36 deletions(-) diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 398bc71509..1d4d70b528 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -76,7 +76,7 @@ class reader_permit::impl : public boost::intrusive::list_base_hook; -static permit_stats do_dump_reader_permit_diagnostics(std::ostream& os, const permit_groups& permits, reader_permit::state state, bool sort_by_memory) { +static permit_stats do_dump_reader_permit_diagnostics(std::ostream& os, const permit_groups& permits, reader_permit::state state) { struct permit_summary { const schema* s; std::string_view op_name; @@ -266,25 +258,17 @@ static permit_stats do_dump_reader_permit_diagnostics(std::ostream& os, const pe } } - std::ranges::sort(permit_summaries, [sort_by_memory] (const permit_summary& a, const permit_summary& b) { - if (sort_by_memory) { - return a.memory < b.memory; - } else { - return a.count < b.count; - } + std::ranges::sort(permit_summaries, [] (const permit_summary& a, const permit_summary& b) { + return a.memory < b.memory; }); permit_stats total; - auto print_line = [&os, sort_by_memory] (auto col1, auto col2, auto col3) { - if (sort_by_memory) { - fmt::print(os, "{}\t{}\t{}\n", col2, col1, col3); - } else { - fmt::print(os, "{}\t{}\t{}\n", col1, col2, col3); - } + auto print_line = [&os] (auto col1, auto col2, auto col3) { + fmt::print(os, "{}\t{}\t{}\n", col2, col1, col3); }; - fmt::print(os, "Permits with state {}, sorted by {}\n", state, sort_by_memory ? "memory" : "count"); + fmt::print(os, "Permits with state {}\n", state); print_line("count", "memory", "name"); for (const auto& summary : permit_summaries) { total.count += summary.count; @@ -310,11 +294,9 @@ static void do_dump_reader_permit_diagnostics(std::ostream& os, const reader_con permit_stats total; fmt::print(os, "Semaphore {}: {}, dumping permit diagnostics:\n", semaphore.name(), problem); - total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::admitted, true); + total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::active); fmt::print(os, "\n"); - total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::waiting, false); - fmt::print(os, "\n"); - total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::registered, false); + total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::waiting); fmt::print(os, "\n"); fmt::print(os, "Total: permits: {}, memory: {}\n", total.count, utils::to_hr_size(total.memory)); } @@ -375,7 +357,7 @@ reader_concurrency_semaphore::~reader_concurrency_semaphore() { reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore::register_inactive_read(std::unique_ptr ir) { // Implies _inactive_reads.empty(), we don't queue new readers before // evicting all inactive reads. - if (_wait_list.empty()) { + if (_wait_list.empty() && _resources.memory > 0) { const auto [it, _] = _inactive_reads.emplace(_next_id++, std::move(ir)); (void)_; ++_stats.inactive_reads; diff --git a/reader_permit.hh b/reader_permit.hh index 4280e79f5c..12b5a518d3 100644 --- a/reader_permit.hh +++ b/reader_permit.hh @@ -91,9 +91,8 @@ public: class resource_units; enum class state { - registered, // read is registered, but didn't attempt admission yet waiting, // waiting for admission - admitted, + active, }; class impl; diff --git a/test/boost/querier_cache_test.cc b/test/boost/querier_cache_test.cc index ad1be6528c..7dd1f86fe3 100644 --- a/test/boost/querier_cache_test.cc +++ b/test/boost/querier_cache_test.cc @@ -712,7 +712,10 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) { nullptr, db::no_timeout).get(); - BOOST_CHECK_EQUAL(db.get_querier_cache_stats().resource_based_evictions, 1); + // The second read might be evicted too if it consumes more + // memory than the first and hence triggers memory control when + // saved in the querier cache. + BOOST_CHECK_GE(db.get_querier_cache_stats().resource_based_evictions, 1); // We want to read the entire partition so that the querier // is not saved at the end and thus ensure it is destroyed. From 960f93383bce414afd6fe39971c27d5afd7a6541 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 25 Mar 2021 13:54:56 +0200 Subject: [PATCH 4/6] reader_concurrency_semaphore: add dump_diagnostics() Allow semaphore related tests to include a diagnostics printout in error messages to help determine why the test failed. (cherry picked from commit d246e2df0aa80b2c3c422e1241db12f3dd2926ea) --- reader_concurrency_semaphore.cc | 6 ++++++ reader_concurrency_semaphore.hh | 2 ++ 2 files changed, 8 insertions(+) diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 1d4d70b528..1d1ff0e159 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -464,6 +464,12 @@ void reader_concurrency_semaphore::broken(std::exception_ptr ex) { } } +std::string reader_concurrency_semaphore::dump_diagnostics() const { + std::ostringstream os; + do_dump_reader_permit_diagnostics(os, *this, *_permit_list, "user request"); + return os.str(); +} + // A file that tracks the memory usage of buffers resulting from read // operations. class tracking_file_impl : public file_impl { diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 61c1a3a972..a249c2203c 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -237,4 +237,6 @@ public: } void broken(std::exception_ptr ex); + + std::string dump_diagnostics() const; }; From 3c3fc18777c247f0a69a6bf3352a200c83c0b1dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 15 Apr 2021 16:24:07 +0300 Subject: [PATCH 5/6] test: mutation_reader_test: add test_reader_concurrency_semaphore_readmission_preserves_units This unit test passes a read through admission again-and-again, just like an evictable reader would be during its lifetime. When readmitted the read sometimes has to wait and sometimes not. This is to check that the readmitting a previously admitted reader doesn't leak any units. (cherry picked from commit cadc26de38774141c98a84bc9601c8155bfbc83d) --- test/boost/mutation_reader_test.cc | 85 ++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index d4d8e53855..c9b2fab12d 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -896,6 +896,91 @@ sstables::shared_sstable create_sstable(sstables::test_env& env, simple_schema& , mutations); } +namespace { + +class generic_inactive_read : public reader_concurrency_semaphore::inactive_read { + flat_mutation_reader_opt _reader; + +private: + explicit generic_inactive_read(flat_mutation_reader&& rd) : _reader(std::move(rd)) { } + + virtual void evict() override { + _reader = {}; + } + +public: + static std::unique_ptr make(flat_mutation_reader&& rd) { + return std::make_unique(generic_inactive_read(std::move(rd))); + } + + static flat_mutation_reader_opt get_reader(std::unique_ptr&& ir) { + if (!ir) { + return {}; + } + auto gir = dynamic_cast(ir.get()); + BOOST_REQUIRE(gir); + return std::move(gir->_reader); + } +}; + +} // anonymous namespace + +// This unit test passes a read through admission again-and-again, just +// like an evictable reader would be during its lifetime. When readmitted +// the read sometimes has to wait and sometimes not. This is to check that +// the readmitting a previously admitted reader doesn't leak any units. +SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves_units) { + simple_schema s; + const auto initial_resources = reader_concurrency_semaphore::resources{10, 1024 * 1024}; + reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name()); + + auto permit = semaphore.make_permit(s.schema().get(), get_name()); + + std::optional residue_units; + + for (int i = 0; i < 10; ++i) { + const auto have_residue_units = bool(residue_units); + + auto current_resources = initial_resources; + if (have_residue_units) { + current_resources -= residue_units->resources(); + } + BOOST_REQUIRE(semaphore.available_resources() == current_resources); + + std::optional admitted_units; + if (i % 2) { + const auto consumed_resources = semaphore.available_resources(); + semaphore.consume(consumed_resources); + + auto units_fut = permit.wait_admission(1024, db::no_timeout); + BOOST_REQUIRE(!units_fut.available()); + + semaphore.signal(consumed_resources); + admitted_units = units_fut.get(); + } else { + admitted_units = permit.wait_admission(1024, db::no_timeout).get(); + } + + current_resources -= admitted_units->resources(); + BOOST_REQUIRE(semaphore.available_resources() == current_resources); + + residue_units.emplace(permit.consume_resources(reader_resources(0, 100))); + if (!have_residue_units) { + current_resources -= residue_units->resources(); + } + BOOST_REQUIRE(semaphore.available_resources() == current_resources); + + auto handle = semaphore.register_inactive_read(generic_inactive_read::make(make_empty_flat_reader(s.schema(), permit))); + BOOST_REQUIRE(semaphore.try_evict_one_inactive_read()); + } + + BOOST_REQUIRE(semaphore.available_resources() == initial_resources - residue_units->resources()); + + residue_units.reset(); + + BOOST_REQUIRE(semaphore.available_resources() == initial_resources); +} + static sstables::shared_sstable create_sstable(sstables::test_env& env, schema_ptr s, std::vector mutations) { static thread_local auto tmp = tmpdir(); From a710866235d1ac5c5d456bd16da38180cfe0df96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 15 Apr 2021 18:46:24 +0300 Subject: [PATCH 6/6] test: mutation_reader_test: add test_reader_concurrency_semaphore_forward_progress This unit test checks that the semaphore doesn't get into a deadlock when contended, in the presence of many memory-only reads (that don't wait for admission). This is tested by simulating the 3 kind of reads we currently have in the system: * memory-only: reads that don't pass admission and only own memory. * admitted: reads that pass admission. * evictable: admitted reads that are furthermore evictable. The test creates and runs a large number of these reads in parallel, read kinds being selected randomly, then creates a watchdog which kills the test if no progress is being made. (cherry picked from commit 45d580f056c16b89b96670bbce86e7a38eac0213) --- test/boost/mutation_reader_test.cc | 130 +++++++++++++++++++++++++++++ 1 file changed, 130 insertions(+) diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index c9b2fab12d..1cea2a72c8 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -981,6 +981,136 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves BOOST_REQUIRE(semaphore.available_resources() == initial_resources); } +// This unit test checks that the semaphore doesn't get into a deadlock +// when contended, in the presence of many memory-only reads (that don't +// wait for admission). This is tested by simulating the 3 kind of reads we +// currently have in the system: +// * memory-only: reads that don't pass admission and only own memory. +// * admitted: reads that pass admission. +// * evictable: admitted reads that are furthermore evictable. +// +// The test creates and runs a large number of these reads in parallel, +// read kinds being selected randomly, then creates a watchdog which +// kills the test if no progress is being made. +SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_forward_progress) { + class reader { + class skeleton_reader : public flat_mutation_reader::impl { + reader_permit::resource_units _base_resources; + std::optional _resources; + public: + skeleton_reader(schema_ptr s, reader_permit permit, reader_permit::resource_units res) + : impl(std::move(s), std::move(permit)), _base_resources(std::move(res)) { } + virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override { + _resources.emplace(_permit.consume_resources(reader_resources(0, tests::random::get_int(1024, 2048)))); + return make_ready_future<>(); + } + virtual void next_partition() override { } + virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { return make_ready_future<>(); } + virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point timeout) override { return make_ready_future<>(); } + }; + struct reader_visitor { + reader& r; + future<> operator()(std::monostate& ms) { return r.tick(ms); } + future<> operator()(flat_mutation_reader& reader) { return r.tick(reader); } + future<> operator()(reader_concurrency_semaphore::inactive_read_handle& handle) { return r.tick(handle); } + }; + + private: + schema_ptr _schema; + reader_permit _permit; + bool _memory_only = true; + bool _evictable = false; + std::optional _units; + std::variant _reader; + + private: + future<> make_reader() { + auto res = _permit.consume_memory(); + if (!_memory_only) { + res = co_await _permit.wait_admission(1024, db::no_timeout); + } + _reader = make_flat_mutation_reader(_schema, _permit, std::move(res)); + } + future<> tick(std::monostate&) { + co_await make_reader(); + co_await tick(std::get(_reader)); + } + future<> tick(flat_mutation_reader& reader) { + co_await reader.fill_buffer(db::no_timeout); + if (_evictable) { + _reader = _permit.semaphore().register_inactive_read(generic_inactive_read::make(std::move(reader))); + } + } + future<> tick(reader_concurrency_semaphore::inactive_read_handle& handle) { + if (auto reader = generic_inactive_read::get_reader(_permit.semaphore().unregister_inactive_read(std::move(handle))); reader) { + _reader = std::move(*reader); + } else { + co_await make_reader(); + } + co_await tick(std::get(_reader)); + } + + public: + reader(schema_ptr s, reader_permit permit, bool memory_only, bool evictable) + : _schema(std::move(s)) + , _permit(std::move(permit)) + , _memory_only(memory_only) + , _evictable(evictable) + , _units(_permit.consume_memory(tests::random::get_int(128, 1024))) + { + } + future<> tick() { + return std::visit(reader_visitor{*this}, _reader); + } + }; + + const auto count = 10; + const auto num_readers = 512; + const auto ticks = 1000; + + simple_schema s; + reader_concurrency_semaphore semaphore(count, count * 1024, get_name()); + + std::list> readers; + unsigned nr_memory_only = 0; + unsigned nr_admitted = 0; + unsigned nr_evictable = 0; + + for (auto i = 0; i < num_readers; ++i) { + const auto memory_only = tests::random::get_bool(); + const auto evictable = !memory_only && tests::random::get_bool(); + if (memory_only) { + ++nr_memory_only; + } else if (evictable) { + ++nr_evictable; + } else { + ++nr_admitted; + } + readers.emplace_back(reader(s.schema(), semaphore.make_permit(s.schema().get(), fmt::format("reader{}", i)), memory_only, evictable)); + } + + testlog.info("Created {} readers, memory_only={}, admitted={}, evictable={}", readers.size(), nr_memory_only, nr_admitted, nr_evictable); + + bool watchdog_touched = false; + auto watchdog = timer([&semaphore, &watchdog_touched] { + if (!watchdog_touched) { + testlog.error("Watchdog detected a deadlock, dumping diagnostics before killing the test: {}", semaphore.dump_diagnostics()); + semaphore.broken(std::make_exception_ptr(std::runtime_error("test killed by watchdog"))); + } + watchdog_touched = false; + }); + watchdog.arm_periodic(std::chrono::seconds(30)); + + parallel_for_each(readers, [&] (std::optional& r) -> future<> { + for (auto i = 0; i < ticks; ++i) { + watchdog_touched = true; + co_await r->tick(); + } + r.reset(); + watchdog_touched = true; + }).get(); +} + static sstables::shared_sstable create_sstable(sstables::test_env& env, schema_ptr s, std::vector mutations) { static thread_local auto tmp = tmpdir();