diff --git a/test/boost/multishard_combining_reader_as_mutation_source_test.cc b/test/boost/multishard_combining_reader_as_mutation_source_test.cc index 12513c69d1..728e11f2b0 100644 --- a/test/boost/multishard_combining_reader_as_mutation_source_test.cc +++ b/test/boost/multishard_combining_reader_as_mutation_source_test.cc @@ -51,8 +51,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) { // It has to be a container that does not invalidate pointers std::list keep_alive_sharder; - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { auto make_populate = [&] (bool evict_paused_readers, bool single_fragment_buffer) { @@ -109,7 +107,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) { return reader; }; - auto lifecycle_policy = seastar::make_shared(std::move(factory), operations_gate, semaphore_registry, evict_paused_readers); + auto lifecycle_policy = seastar::make_shared(std::move(factory), evict_paused_readers); auto mr = make_multishard_combining_reader_for_tests(keep_alive_sharder.back(), std::move(lifecycle_policy), s, tests::make_permit(), range, slice, pc, trace_state, fwd_mr); if (fwd_sm == streamed_mutation::forwarding::yes) { @@ -129,6 +127,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) { testlog.info("run_mutation_source_tests(evict_readers=true, single_fragment_buffer=true)"); run_mutation_source_tests(make_populate(true, true)); - return operations_gate.close(); + return make_ready_future<>(); }).get(); } diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index d02ec0a754..1d43aa6d2a 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -1781,9 +1781,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) { return; } - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { std::vector> shards_touched(smp::count); simple_schema s; @@ -1799,7 +1796,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) { }; assert_that(make_multishard_combining_reader( - seastar::make_shared(std::move(factory), operations_gate, semaphore_registry), + seastar::make_shared(std::move(factory)), s.schema(), tests::make_permit(), query::full_partition_range, @@ -1811,7 +1808,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) { BOOST_REQUIRE(shards_touched.at(i)); } - return operations_gate.close(); + return make_ready_future<>(); }).get(); } @@ -2010,8 +2007,7 @@ struct multishard_reader_for_read_ahead { std::unique_ptr pr; }; -multishard_reader_for_read_ahead prepare_multishard_reader_for_read_ahead_test(simple_schema& s, test_reader_lifecycle_policy::operations_gate& operations_gate, - test_reader_lifecycle_policy::semaphore_registry& semaphore_registry) { +multishard_reader_for_read_ahead prepare_multishard_reader_for_read_ahead_test(simple_schema& s) { auto remote_controls = std::vector>>(); remote_controls.reserve(smp::count); for (unsigned i = 0; i < smp::count; ++i) { @@ -2068,7 +2064,7 @@ multishard_reader_for_read_ahead prepare_multishard_reader_for_read_ahead_test(s dht::ring_position::ending_at(pkeys_by_tokens.rbegin()->first))); auto sharder = std::make_unique(s.schema()->get_sharder(), std::move(pkeys_by_tokens)); - auto reader = make_multishard_combining_reader_for_tests(*sharder, seastar::make_shared(std::move(factory), operations_gate, semaphore_registry), + auto reader = make_multishard_combining_reader_for_tests(*sharder, seastar::make_shared(std::move(factory)), s.schema(), tests::make_permit(), *pr, s.schema()->full_slice(), service::get_local_sstable_query_read_priority()); return {std::move(reader), std::move(sharder), std::move(remote_controls), std::move(pr)}; @@ -2082,8 +2078,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_custom_shard_number) { } auto no_shards = smp::count - 1; - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { std::vector> shards_touched(smp::count); @@ -2102,7 +2096,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_custom_shard_number) { assert_that(make_multishard_combining_reader_for_tests( *sharder, - seastar::make_shared(std::move(factory), operations_gate, semaphore_registry), + seastar::make_shared(std::move(factory)), s.schema(), tests::make_permit(), query::full_partition_range, @@ -2115,7 +2109,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_custom_shard_number) { } BOOST_REQUIRE(!shards_touched[no_shards]); - return operations_gate.close(); + return make_ready_future<>(); }).get(); } @@ -2126,9 +2120,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_only_reads_from_needed return; } - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { std::vector> shards_touched(smp::count); simple_schema s; @@ -2171,7 +2162,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_only_reads_from_needed inclusive_end ? "inclusive" : "exclusive"); assert_that(make_multishard_combining_reader( - seastar::make_shared(std::move(factory), operations_gate, semaphore_registry), + seastar::make_shared(std::move(factory)), s.schema(), tests::make_permit(), pr, @@ -2184,7 +2175,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_only_reads_from_needed BOOST_CHECK(shards_touched[i] == expected_shards_touched[i]); } - return operations_gate.close(); + return make_ready_future<>(); }).get(); } @@ -2218,13 +2209,10 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending return; } - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { auto s = simple_schema(); - auto reader_sharder_remote_controls__ = prepare_multishard_reader_for_read_ahead_test(s, operations_gate, semaphore_registry); + auto reader_sharder_remote_controls__ = prepare_multishard_reader_for_read_ahead_test(s); auto&& reader = reader_sharder_remote_controls__.reader; auto&& sharder = reader_sharder_remote_controls__.sharder; auto&& remote_controls = reader_sharder_remote_controls__.remote_controls; @@ -2269,7 +2257,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending std::logical_and()).get0(); })); - return operations_gate.close(); + return make_ready_future<>(); }).get(); } @@ -2279,13 +2267,10 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_fast_forwarded_with_pe return; } - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { auto s = simple_schema(); - auto reader_sharder_remote_controls_pr = prepare_multishard_reader_for_read_ahead_test(s, operations_gate, semaphore_registry); + auto reader_sharder_remote_controls_pr = prepare_multishard_reader_for_read_ahead_test(s); auto&& reader = reader_sharder_remote_controls_pr.reader; auto&& sharder = reader_sharder_remote_controls_pr.sharder; auto&& remote_controls = reader_sharder_remote_controls_pr.remote_controls; @@ -2348,14 +2333,11 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_fast_forwarded_with_pe std::logical_and()).get0(); })); - return operations_gate.close(); + return make_ready_future<>(); }).get(); } SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) { - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { env.execute_cql("CREATE KEYSPACE multishard_combining_reader_next_partition_ks" " WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1};").get(); @@ -2407,7 +2389,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) { return reader; }; auto reader = make_multishard_combining_reader( - seastar::make_shared(std::move(factory), operations_gate, semaphore_registry), + seastar::make_shared(std::move(factory)), schema, tests::make_permit(), query::full_partition_range, @@ -2428,7 +2410,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) { } assertions.produces_end_of_stream(); - return operations_gate.close(); + return make_ready_future<>(); }).get(); } @@ -2516,10 +2498,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_non_strictly_monotonic BOOST_REQUIRE(mf.as_clustering_row().key().equal(*s.schema(), ckey)); } - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - - do_with_cql_env_thread([=, &operations_gate, &semaphore_registry, s = std::move(s)] (cql_test_env& env) mutable -> future<> { + do_with_cql_env_thread([=, s = std::move(s)] (cql_test_env& env) mutable -> future<> { auto factory = [=, gs = global_simple_schema(s)] ( schema_ptr, const dht::partition_range& range, @@ -2545,7 +2524,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_non_strictly_monotonic BOOST_REQUIRE(mut_opt); assert_that(make_multishard_combining_reader( - seastar::make_shared(std::move(factory), operations_gate, semaphore_registry, true), + seastar::make_shared(std::move(factory), true), s.schema(), tests::make_permit(), query::full_partition_range, @@ -2553,7 +2532,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_non_strictly_monotonic service::get_local_sstable_query_read_priority())) .produces_partition(*mut_opt); - return operations_gate.close(); + return make_ready_future<>(); }).get(); } @@ -2567,9 +2546,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) { return; } - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { env.execute_cql("CREATE KEYSPACE multishard_streaming_reader_ks WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1};").get(); env.execute_cql("CREATE TABLE multishard_streaming_reader_ks.test (pk int, v int, PRIMARY KEY(pk));").get(); @@ -2613,7 +2589,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) { streamed_mutation::forwarding::no, fwd_mr); }; auto reference_reader = make_filtering_reader( - make_multishard_combining_reader(seastar::make_shared(std::move(reader_factory), operations_gate, semaphore_registry), + make_multishard_combining_reader(seastar::make_shared(std::move(reader_factory)), schema, tests::make_permit(), partition_range, schema->full_slice(), service::get_local_sstable_query_read_priority()), [&remote_partitioner] (const dht::decorated_key& pkey) { return remote_partitioner.shard_of(pkey.token()) == 0; @@ -2638,7 +2614,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) { assert_that(tested_muts[i]).is_equal_to(reference_muts[i]); } - return operations_gate.close(); + return make_ready_future<>(); }).get(); } diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index b52bd1e4bc..cdcfd7ca75 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -27,68 +27,6 @@ class test_reader_lifecycle_policy : public reader_lifecycle_policy , public enable_shared_from_this { -public: - class operations_gate { - public: - class operation { - gate* _g = nullptr; - - private: - void leave() { - if (_g) { - _g->leave(); - } - } - - public: - operation() = default; - explicit operation(gate& g) : _g(&g) { _g->enter(); } - operation(const operation&) = delete; - operation(operation&& o) : _g(std::exchange(o._g, nullptr)) { } - ~operation() { leave(); } - operation& operator=(const operation&) = delete; - operation& operator=(operation&& o) { - leave(); - _g = std::exchange(o._g, nullptr); - return *this; - } - }; - - private: - std::vector _gates; - - public: - operations_gate() - : _gates(smp::count) { - } - - operation enter() { - return operation(_gates[this_shard_id()]); - } - - future<> close() { - return parallel_for_each(boost::irange(smp::count), [this] (shard_id shard) { - return smp::submit_to(shard, [this, shard] { - return _gates[shard].close(); - }); - }); - } - }; - - class semaphore_registry { - std::vector< // 1 per shard - std::list> _semaphores; - public: - semaphore_registry() : _semaphores(smp::count) { } - semaphore_registry(semaphore_registry&&) = delete; - semaphore_registry(const semaphore_registry&) = delete; - template - reader_concurrency_semaphore& create_semaphore(Arg&&... arg) { - return _semaphores[this_shard_id()].emplace_back(std::forward(arg)...); - } - }; - -private: using factory_function = std::function; struct reader_context { - reader_concurrency_semaphore* semaphore = nullptr; - operations_gate::operation op; + std::optional semaphore; std::optional range; std::optional slice; @@ -109,17 +46,13 @@ private: }; factory_function _factory_function; - operations_gate& _operation_gate; - semaphore_registry& _semaphore_registry; std::vector>> _contexts; std::vector> _destroy_futures; bool _evict_paused_readers = false; public: - explicit test_reader_lifecycle_policy(factory_function f, operations_gate& g, semaphore_registry& semaphore_registry, bool evict_paused_readers = false) + explicit test_reader_lifecycle_policy(factory_function f, bool evict_paused_readers = false) : _factory_function(std::move(f)) - , _operation_gate(g) - , _semaphore_registry(semaphore_registry) , _contexts(smp::count) , _evict_paused_readers(evict_paused_readers) { } @@ -138,11 +71,9 @@ public: } else { _contexts[shard] = make_foreign(std::make_unique(range, slice)); } - _contexts[shard]->op = _operation_gate.enter(); return _factory_function(std::move(schema), *_contexts[shard]->range, *_contexts[shard]->slice, pc, std::move(trace_state), fwd_mr); } virtual future<> destroy_reader(stopped_reader reader) noexcept override { - // waited via _operation_gate auto ctx = &*_contexts[this_shard_id()]; auto reader_opt = ctx->semaphore->unregister_inactive_read(std::move(reader.handle)); auto ret = reader_opt ? reader_opt->close() : make_ready_future<>(); @@ -159,9 +90,9 @@ public: } if (_evict_paused_readers) { // Create with no memory, so all inactive reads are immediately evicted. - _contexts[shard]->semaphore = &_semaphore_registry.create_semaphore(1, 0, format("reader_concurrency_semaphore @shard_id={}", shard)); + _contexts[shard]->semaphore.emplace(1, 0, format("reader_concurrency_semaphore @shard_id={}", shard)); } else { - _contexts[shard]->semaphore = &_semaphore_registry.create_semaphore(reader_concurrency_semaphore::no_limits{}); + _contexts[shard]->semaphore.emplace(reader_concurrency_semaphore::no_limits{}); } return *_contexts[shard]->semaphore; }