From d2ddaced4e65dfaedad60914b66fd5fb63fa85c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 27 May 2021 15:42:54 +0300 Subject: [PATCH] test/lib/reader_lifecycle_policy: get rid of lifecycle workarounds The lifecycle of the reader lifecycle policy and all the resources the reads use is now enclosed in that of the multishard reader thanks to its close() method. We can now remove all the workarounds we had in place to keep different resources as long as background reader cleanup finishes. --- ...ombining_reader_as_mutation_source_test.cc | 6 +- test/boost/mutation_reader_test.cc | 62 +++++---------- test/lib/reader_lifecycle_policy.hh | 77 +------------------ 3 files changed, 25 insertions(+), 120 deletions(-) diff --git a/test/boost/multishard_combining_reader_as_mutation_source_test.cc b/test/boost/multishard_combining_reader_as_mutation_source_test.cc index 12513c69d1..728e11f2b0 100644 --- a/test/boost/multishard_combining_reader_as_mutation_source_test.cc +++ b/test/boost/multishard_combining_reader_as_mutation_source_test.cc @@ -51,8 +51,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) { // It has to be a container that does not invalidate pointers std::list keep_alive_sharder; - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { auto make_populate = [&] (bool evict_paused_readers, bool single_fragment_buffer) { @@ -109,7 +107,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) { return reader; }; - auto lifecycle_policy = seastar::make_shared(std::move(factory), operations_gate, semaphore_registry, evict_paused_readers); + auto lifecycle_policy = seastar::make_shared(std::move(factory), evict_paused_readers); auto mr = make_multishard_combining_reader_for_tests(keep_alive_sharder.back(), std::move(lifecycle_policy), s, tests::make_permit(), range, slice, pc, trace_state, fwd_mr); if (fwd_sm == streamed_mutation::forwarding::yes) { @@ -129,6 +127,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) { testlog.info("run_mutation_source_tests(evict_readers=true, single_fragment_buffer=true)"); run_mutation_source_tests(make_populate(true, true)); - return operations_gate.close(); + return make_ready_future<>(); }).get(); } diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index d02ec0a754..1d43aa6d2a 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -1781,9 +1781,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) { return; } - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { std::vector> shards_touched(smp::count); simple_schema s; @@ -1799,7 +1796,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) { }; assert_that(make_multishard_combining_reader( - seastar::make_shared(std::move(factory), operations_gate, semaphore_registry), + seastar::make_shared(std::move(factory)), s.schema(), tests::make_permit(), query::full_partition_range, @@ -1811,7 +1808,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) { BOOST_REQUIRE(shards_touched.at(i)); } - return operations_gate.close(); + return make_ready_future<>(); }).get(); } @@ -2010,8 +2007,7 @@ struct multishard_reader_for_read_ahead { std::unique_ptr pr; }; -multishard_reader_for_read_ahead prepare_multishard_reader_for_read_ahead_test(simple_schema& s, test_reader_lifecycle_policy::operations_gate& operations_gate, - test_reader_lifecycle_policy::semaphore_registry& semaphore_registry) { +multishard_reader_for_read_ahead prepare_multishard_reader_for_read_ahead_test(simple_schema& s) { auto remote_controls = std::vector>>(); remote_controls.reserve(smp::count); for (unsigned i = 0; i < smp::count; ++i) { @@ -2068,7 +2064,7 @@ multishard_reader_for_read_ahead prepare_multishard_reader_for_read_ahead_test(s dht::ring_position::ending_at(pkeys_by_tokens.rbegin()->first))); auto sharder = std::make_unique(s.schema()->get_sharder(), std::move(pkeys_by_tokens)); - auto reader = make_multishard_combining_reader_for_tests(*sharder, seastar::make_shared(std::move(factory), operations_gate, semaphore_registry), + auto reader = make_multishard_combining_reader_for_tests(*sharder, seastar::make_shared(std::move(factory)), s.schema(), tests::make_permit(), *pr, s.schema()->full_slice(), service::get_local_sstable_query_read_priority()); return {std::move(reader), std::move(sharder), std::move(remote_controls), std::move(pr)}; @@ -2082,8 +2078,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_custom_shard_number) { } auto no_shards = smp::count - 1; - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { std::vector> shards_touched(smp::count); @@ -2102,7 +2096,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_custom_shard_number) { assert_that(make_multishard_combining_reader_for_tests( *sharder, - seastar::make_shared(std::move(factory), operations_gate, semaphore_registry), + seastar::make_shared(std::move(factory)), s.schema(), tests::make_permit(), query::full_partition_range, @@ -2115,7 +2109,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_custom_shard_number) { } BOOST_REQUIRE(!shards_touched[no_shards]); - return operations_gate.close(); + return make_ready_future<>(); }).get(); } @@ -2126,9 +2120,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_only_reads_from_needed return; } - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { std::vector> shards_touched(smp::count); simple_schema s; @@ -2171,7 +2162,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_only_reads_from_needed inclusive_end ? "inclusive" : "exclusive"); assert_that(make_multishard_combining_reader( - seastar::make_shared(std::move(factory), operations_gate, semaphore_registry), + seastar::make_shared(std::move(factory)), s.schema(), tests::make_permit(), pr, @@ -2184,7 +2175,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_only_reads_from_needed BOOST_CHECK(shards_touched[i] == expected_shards_touched[i]); } - return operations_gate.close(); + return make_ready_future<>(); }).get(); } @@ -2218,13 +2209,10 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending return; } - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { auto s = simple_schema(); - auto reader_sharder_remote_controls__ = prepare_multishard_reader_for_read_ahead_test(s, operations_gate, semaphore_registry); + auto reader_sharder_remote_controls__ = prepare_multishard_reader_for_read_ahead_test(s); auto&& reader = reader_sharder_remote_controls__.reader; auto&& sharder = reader_sharder_remote_controls__.sharder; auto&& remote_controls = reader_sharder_remote_controls__.remote_controls; @@ -2269,7 +2257,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending std::logical_and()).get0(); })); - return operations_gate.close(); + return make_ready_future<>(); }).get(); } @@ -2279,13 +2267,10 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_fast_forwarded_with_pe return; } - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { auto s = simple_schema(); - auto reader_sharder_remote_controls_pr = prepare_multishard_reader_for_read_ahead_test(s, operations_gate, semaphore_registry); + auto reader_sharder_remote_controls_pr = prepare_multishard_reader_for_read_ahead_test(s); auto&& reader = reader_sharder_remote_controls_pr.reader; auto&& sharder = reader_sharder_remote_controls_pr.sharder; auto&& remote_controls = reader_sharder_remote_controls_pr.remote_controls; @@ -2348,14 +2333,11 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_fast_forwarded_with_pe std::logical_and()).get0(); })); - return operations_gate.close(); + return make_ready_future<>(); }).get(); } SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) { - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { env.execute_cql("CREATE KEYSPACE multishard_combining_reader_next_partition_ks" " WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1};").get(); @@ -2407,7 +2389,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) { return reader; }; auto reader = make_multishard_combining_reader( - seastar::make_shared(std::move(factory), operations_gate, semaphore_registry), + seastar::make_shared(std::move(factory)), schema, tests::make_permit(), query::full_partition_range, @@ -2428,7 +2410,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) { } assertions.produces_end_of_stream(); - return operations_gate.close(); + return make_ready_future<>(); }).get(); } @@ -2516,10 +2498,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_non_strictly_monotonic BOOST_REQUIRE(mf.as_clustering_row().key().equal(*s.schema(), ckey)); } - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - - do_with_cql_env_thread([=, &operations_gate, &semaphore_registry, s = std::move(s)] (cql_test_env& env) mutable -> future<> { + do_with_cql_env_thread([=, s = std::move(s)] (cql_test_env& env) mutable -> future<> { auto factory = [=, gs = global_simple_schema(s)] ( schema_ptr, const dht::partition_range& range, @@ -2545,7 +2524,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_non_strictly_monotonic BOOST_REQUIRE(mut_opt); assert_that(make_multishard_combining_reader( - seastar::make_shared(std::move(factory), operations_gate, semaphore_registry, true), + seastar::make_shared(std::move(factory), true), s.schema(), tests::make_permit(), query::full_partition_range, @@ -2553,7 +2532,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_non_strictly_monotonic service::get_local_sstable_query_read_priority())) .produces_partition(*mut_opt); - return operations_gate.close(); + return make_ready_future<>(); }).get(); } @@ -2567,9 +2546,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) { return; } - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { env.execute_cql("CREATE KEYSPACE multishard_streaming_reader_ks WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1};").get(); env.execute_cql("CREATE TABLE multishard_streaming_reader_ks.test (pk int, v int, PRIMARY KEY(pk));").get(); @@ -2613,7 +2589,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) { streamed_mutation::forwarding::no, fwd_mr); }; auto reference_reader = make_filtering_reader( - make_multishard_combining_reader(seastar::make_shared(std::move(reader_factory), operations_gate, semaphore_registry), + make_multishard_combining_reader(seastar::make_shared(std::move(reader_factory)), schema, tests::make_permit(), partition_range, schema->full_slice(), service::get_local_sstable_query_read_priority()), [&remote_partitioner] (const dht::decorated_key& pkey) { return remote_partitioner.shard_of(pkey.token()) == 0; @@ -2638,7 +2614,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) { assert_that(tested_muts[i]).is_equal_to(reference_muts[i]); } - return operations_gate.close(); + return make_ready_future<>(); }).get(); } diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index b52bd1e4bc..cdcfd7ca75 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -27,68 +27,6 @@ class test_reader_lifecycle_policy : public reader_lifecycle_policy , public enable_shared_from_this { -public: - class operations_gate { - public: - class operation { - gate* _g = nullptr; - - private: - void leave() { - if (_g) { - _g->leave(); - } - } - - public: - operation() = default; - explicit operation(gate& g) : _g(&g) { _g->enter(); } - operation(const operation&) = delete; - operation(operation&& o) : _g(std::exchange(o._g, nullptr)) { } - ~operation() { leave(); } - operation& operator=(const operation&) = delete; - operation& operator=(operation&& o) { - leave(); - _g = std::exchange(o._g, nullptr); - return *this; - } - }; - - private: - std::vector _gates; - - public: - operations_gate() - : _gates(smp::count) { - } - - operation enter() { - return operation(_gates[this_shard_id()]); - } - - future<> close() { - return parallel_for_each(boost::irange(smp::count), [this] (shard_id shard) { - return smp::submit_to(shard, [this, shard] { - return _gates[shard].close(); - }); - }); - } - }; - - class semaphore_registry { - std::vector< // 1 per shard - std::list> _semaphores; - public: - semaphore_registry() : _semaphores(smp::count) { } - semaphore_registry(semaphore_registry&&) = delete; - semaphore_registry(const semaphore_registry&) = delete; - template - reader_concurrency_semaphore& create_semaphore(Arg&&... arg) { - return _semaphores[this_shard_id()].emplace_back(std::forward(arg)...); - } - }; - -private: using factory_function = std::function; struct reader_context { - reader_concurrency_semaphore* semaphore = nullptr; - operations_gate::operation op; + std::optional semaphore; std::optional range; std::optional slice; @@ -109,17 +46,13 @@ private: }; factory_function _factory_function; - operations_gate& _operation_gate; - semaphore_registry& _semaphore_registry; std::vector>> _contexts; std::vector> _destroy_futures; bool _evict_paused_readers = false; public: - explicit test_reader_lifecycle_policy(factory_function f, operations_gate& g, semaphore_registry& semaphore_registry, bool evict_paused_readers = false) + explicit test_reader_lifecycle_policy(factory_function f, bool evict_paused_readers = false) : _factory_function(std::move(f)) - , _operation_gate(g) - , _semaphore_registry(semaphore_registry) , _contexts(smp::count) , _evict_paused_readers(evict_paused_readers) { } @@ -138,11 +71,9 @@ public: } else { _contexts[shard] = make_foreign(std::make_unique(range, slice)); } - _contexts[shard]->op = _operation_gate.enter(); return _factory_function(std::move(schema), *_contexts[shard]->range, *_contexts[shard]->slice, pc, std::move(trace_state), fwd_mr); } virtual future<> destroy_reader(stopped_reader reader) noexcept override { - // waited via _operation_gate auto ctx = &*_contexts[this_shard_id()]; auto reader_opt = ctx->semaphore->unregister_inactive_read(std::move(reader.handle)); auto ret = reader_opt ? reader_opt->close() : make_ready_future<>(); @@ -159,9 +90,9 @@ public: } if (_evict_paused_readers) { // Create with no memory, so all inactive reads are immediately evicted. - _contexts[shard]->semaphore = &_semaphore_registry.create_semaphore(1, 0, format("reader_concurrency_semaphore @shard_id={}", shard)); + _contexts[shard]->semaphore.emplace(1, 0, format("reader_concurrency_semaphore @shard_id={}", shard)); } else { - _contexts[shard]->semaphore = &_semaphore_registry.create_semaphore(reader_concurrency_semaphore::no_limits{}); + _contexts[shard]->semaphore.emplace(reader_concurrency_semaphore::no_limits{}); } return *_contexts[shard]->semaphore; }