diff --git a/test/boost/multishard_combining_reader_as_mutation_source_test.cc b/test/boost/multishard_combining_reader_as_mutation_source_test.cc index b2d489437f..0b8ee7c24a 100644 --- a/test/boost/multishard_combining_reader_as_mutation_source_test.cc +++ b/test/boost/multishard_combining_reader_as_mutation_source_test.cc @@ -51,10 +51,11 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) { // It has to be a container that does not invalidate pointers std::list keep_alive_sharder; + test_reader_lifecycle_policy::operations_gate operations_gate; - do_with_cql_env([&keep_alive_sharder] (cql_test_env& env) -> future<> { - auto make_populate = [&keep_alive_sharder, &env] (bool evict_paused_readers, bool single_fragment_buffer) { - return [&keep_alive_sharder, &env, evict_paused_readers, single_fragment_buffer] (schema_ptr s, const std::vector& mutations) mutable { + do_with_cql_env([&] (cql_test_env& env) -> future<> { + auto make_populate = [&] (bool evict_paused_readers, bool single_fragment_buffer) { + return [&, evict_paused_readers, single_fragment_buffer] (schema_ptr s, const std::vector& mutations) mutable { // We need to group mutations that have the same token so they land on the same shard. std::map> mutations_by_token; @@ -84,7 +85,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) { } keep_alive_sharder.push_back(sharder); - return mutation_source([&keep_alive_sharder, remote_memtables, evict_paused_readers, single_fragment_buffer] (schema_ptr s, + return mutation_source([&, remote_memtables, evict_paused_readers, single_fragment_buffer] (schema_ptr s, reader_permit, const dht::partition_range& range, const query::partition_slice& slice, @@ -107,7 +108,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) { return reader; }; - auto lifecycle_policy = seastar::make_shared(std::move(factory), evict_paused_readers); + auto lifecycle_policy = seastar::make_shared(std::move(factory), operations_gate, evict_paused_readers); auto mr = make_multishard_combining_reader_for_tests(keep_alive_sharder.back(), std::move(lifecycle_policy), s, range, slice, pc, trace_state, fwd_mr); if (fwd_sm == streamed_mutation::forwarding::yes) { return make_forwardable(std::move(mr)); @@ -126,6 +127,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) { testlog.info("run_mutation_source_tests(evict_readers=true, single_fragment_buffer=true)"); run_mutation_source_tests(make_populate(true, true)); - return make_ready_future<>(); + return operations_gate.close(); }).get(); } diff --git a/test/boost/multishard_mutation_query_test.cc b/test/boost/multishard_mutation_query_test.cc index 32f84bab67..5affa862ff 100644 --- a/test/boost/multishard_mutation_query_test.cc +++ b/test/boost/multishard_mutation_query_test.cc @@ -1025,9 +1025,9 @@ SEASTAR_THREAD_TEST_CASE(fuzzy_test) { auto resources = sem.available_resources(); resources -= reader_concurrency_semaphore::resources{1, 0}; - auto permit = sem.consume_resources(resources); + auto permit = sem.make_permit(); - return run_fuzzy_test_workload(cfg, *db, gs.get(), partitions).finally([permit = std::move(permit)] {}); + return run_fuzzy_test_workload(cfg, *db, gs.get(), partitions).finally([units = permit.consume_resources(resources)] {}); }).handle_exception([seed] (std::exception_ptr e) { testlog.error("Test workload failed with exception {}." " To repeat this particular run, replace the random seed of the test, with that of this run ({})." diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 8863300030..dc817722a8 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -1085,8 +1085,7 @@ class dummy_file_impl : public file_impl { SEASTAR_TEST_CASE(reader_restriction_file_tracking) { return async([&] { reader_concurrency_semaphore semaphore(100, 4 * 1024, get_name()); - // Testing the tracker here, no need to have a base cost. - auto permit = semaphore.wait_admission(0, db::no_timeout).get0(); + auto permit = semaphore.make_permit(); { auto tracked_file = make_tracked_file(file(shared_ptr(make_shared())), permit); @@ -1958,7 +1957,9 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) { return; } - do_with_cql_env([] (cql_test_env& env) -> future<> { + test_reader_lifecycle_policy::operations_gate operations_gate; + + do_with_cql_env([&] (cql_test_env& env) -> future<> { std::vector> shards_touched(smp::count); simple_schema s; auto factory = [&shards_touched] ( @@ -1973,7 +1974,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) { }; assert_that(make_multishard_combining_reader( - seastar::make_shared(std::move(factory)), + seastar::make_shared(std::move(factory), operations_gate), s.schema(), query::full_partition_range, s.schema()->full_slice(), @@ -1984,7 +1985,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) { BOOST_REQUIRE(shards_touched.at(i)); } - return make_ready_future<>(); + return operations_gate.close(); }).get(); } @@ -2179,7 +2180,9 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending return; } - do_with_cql_env([] (cql_test_env& env) -> future<> { + test_reader_lifecycle_policy::operations_gate operations_gate; + + do_with_cql_env([&] (cql_test_env& env) -> future<> { auto remote_controls = std::vector>>(); remote_controls.reserve(smp::count); for (unsigned i = 0; i < smp::count; ++i) { @@ -2222,7 +2225,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending { dummy_sharder sharder(s.schema()->get_sharder(), std::move(pkeys_by_tokens)); - auto reader = make_multishard_combining_reader_for_tests(sharder, seastar::make_shared(std::move(factory)), + auto reader = make_multishard_combining_reader_for_tests(sharder, seastar::make_shared(std::move(factory), operations_gate), s.schema(), query::full_partition_range, s.schema()->full_slice(), service::get_local_sstable_query_read_priority()); reader.fill_buffer(db::no_timeout).get(); BOOST_REQUIRE(reader.is_buffer_full()); @@ -2244,12 +2247,14 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending std::logical_and()).get0(); })); - return make_ready_future<>(); + return operations_gate.close(); }).get(); } SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) { - do_with_cql_env([] (cql_test_env& env) -> future<> { + test_reader_lifecycle_policy::operations_gate operations_gate; + + do_with_cql_env([&] (cql_test_env& env) -> future<> { env.execute_cql("CREATE KEYSPACE multishard_combining_reader_next_partition_ks" " WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1};").get(); env.execute_cql("CREATE TABLE multishard_combining_reader_next_partition_ks.test (pk int, v int, PRIMARY KEY(pk));").get(); @@ -2300,7 +2305,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) { return reader; }; auto reader = make_multishard_combining_reader( - seastar::make_shared(std::move(factory)), + seastar::make_shared(std::move(factory), operations_gate), schema, query::full_partition_range, schema->full_slice(), @@ -2320,7 +2325,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) { } assertions.produces_end_of_stream(); - return make_ready_future<>(); + return operations_gate.close(); }).get(); } @@ -2402,7 +2407,9 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_non_strictly_monotonic BOOST_REQUIRE(mf.as_clustering_row().key().equal(*s.schema(), ckey)); } - do_with_cql_env([=, s = std::move(s)] (cql_test_env& env) mutable -> future<> { + test_reader_lifecycle_policy::operations_gate operations_gate; + + do_with_cql_env([=, &operations_gate, s = std::move(s)] (cql_test_env& env) mutable -> future<> { auto factory = [=, gs = global_simple_schema(s)] ( schema_ptr, const dht::partition_range& range, @@ -2427,14 +2434,14 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_non_strictly_monotonic BOOST_REQUIRE(mut_opt); assert_that(make_multishard_combining_reader( - seastar::make_shared(std::move(factory), true), + seastar::make_shared(std::move(factory), operations_gate, true), s.schema(), query::full_partition_range, s.schema()->full_slice(), service::get_local_sstable_query_read_priority())) .produces_partition(*mut_opt); - return make_ready_future<>(); + return operations_gate.close(); }).get(); } @@ -2448,7 +2455,9 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) { return; } - do_with_cql_env([] (cql_test_env& env) -> future<> { + test_reader_lifecycle_policy::operations_gate operations_gate; + + do_with_cql_env([&] (cql_test_env& env) -> future<> { env.execute_cql("CREATE KEYSPACE multishard_streaming_reader_ks WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1};").get(); env.execute_cql("CREATE TABLE multishard_streaming_reader_ks.test (pk int, v int, PRIMARY KEY(pk));").get(); @@ -2490,7 +2499,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) { streamed_mutation::forwarding::no, fwd_mr); }; auto reference_reader = make_filtering_reader( - make_multishard_combining_reader(seastar::make_shared(std::move(reader_factory)), + make_multishard_combining_reader(seastar::make_shared(std::move(reader_factory), operations_gate), schema, partition_range, schema->full_slice(), service::get_local_sstable_query_read_priority()), [&remote_partitioner] (const dht::decorated_key& pkey) { return remote_partitioner.shard_of(pkey.token()) == 0; @@ -2514,7 +2523,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) { assert_that(tested_muts[i]).is_equal_to(reference_muts[i]); } - return make_ready_future<>(); + return operations_gate.close(); }).get(); } diff --git a/test/boost/querier_cache_test.cc b/test/boost/querier_cache_test.cc index 2e56c17a1b..5513d8fbf3 100644 --- a/test/boost/querier_cache_test.cc +++ b/test/boost/querier_cache_test.cc @@ -677,21 +677,21 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) { db::no_timeout).get(); auto& semaphore = cf.read_concurrency_semaphore(); + auto permit = semaphore.make_permit(); BOOST_CHECK_EQUAL(db.get_querier_cache_stats().resource_based_evictions, 0); // Drain all resources of the semaphore - std::vector permits; const auto resources = semaphore.available_resources(); - permits.reserve(resources.count); - const auto per_permit_memory = resources.memory / resources.count; + const auto per_count_memory = resources.memory / resources.count; - for (int i = 0; i < resources.count; ++i) { - permits.emplace_back(semaphore.wait_admission(per_permit_memory, db::no_timeout).get0()); + auto units = permit.wait_admission(per_count_memory, db::no_timeout).get0(); + for (int i = 0; i < resources.count - 1; ++i) { + units.add(permit.wait_admission(per_count_memory, db::no_timeout).get0()); } BOOST_CHECK_EQUAL(semaphore.available_resources().count, 0); - BOOST_CHECK(semaphore.available_resources().memory < per_permit_memory); + BOOST_CHECK(semaphore.available_resources().memory < per_count_memory); auto cmd2 = query::read_command(s->id(), s->version(), @@ -749,12 +749,13 @@ SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) { test_querier_cache t; auto& sem = t.get_semaphore(); + auto permit = sem.make_permit(); - auto permit1 = sem.consume_resources(reader_concurrency_semaphore::resources(sem.available_resources().count, 0)); + auto resources = permit.consume_resources(reader_resources(sem.available_resources().count, 0)); BOOST_CHECK_EQUAL(sem.available_resources().count, 0); - auto permit2_fut = sem.wait_admission(1, db::no_timeout); + auto fut = permit.wait_admission(1, db::no_timeout); BOOST_CHECK_EQUAL(sem.waiters(), 1); @@ -764,7 +765,7 @@ SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) { .no_drops() .resource_based_evictions(); - permit1.release(); + resources.reset(); - permit2_fut.get(); + fut.get(); } diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index 1bba4a0b50..bddf51fef7 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -26,6 +26,55 @@ class test_reader_lifecycle_policy : public reader_lifecycle_policy , public enable_shared_from_this { +public: + class operations_gate { + public: + class operation { + gate* _g = nullptr; + + private: + void leave() { + if (_g) { + _g->leave(); + } + } + + public: + operation() = default; + explicit operation(gate& g) : _g(&g) { _g->enter(); } + operation(const operation&) = delete; + operation(operation&& o) : _g(std::exchange(o._g, nullptr)) { } + ~operation() { leave(); } + operation& operator=(const operation&) = delete; + operation& operator=(operation&& o) { + leave(); + _g = std::exchange(o._g, nullptr); + return *this; + } + }; + + private: + std::vector _gates; + + public: + operations_gate() + : _gates(smp::count) { + } + + operation enter() { + return operation(_gates[this_shard_id()]); + } + + future<> close() { + return parallel_for_each(boost::irange(smp::count), [this] (shard_id shard) { + return smp::submit_to(shard, [this, shard] { + return _gates[shard].close(); + }); + }); + } + }; + +private: using factory_function = std::function; - struct reader_params { - const dht::partition_range range; - const query::partition_slice slice; - }; struct reader_context { - foreign_ptr> semaphore; - foreign_ptr> params; + std::unique_ptr semaphore; + operations_gate::operation op; + std::optional permit; + std::optional> wait_future; + std::optional range; + std::optional slice; + + reader_context(dht::partition_range range, query::partition_slice slice) : range(std::move(range)), slice(std::move(slice)) { + } }; factory_function _factory_function; - std::vector _contexts; + operations_gate& _operation_gate; + std::vector>> _contexts; + std::vector> _destroy_futures; bool _evict_paused_readers = false; public: - explicit test_reader_lifecycle_policy(factory_function f, bool evict_paused_readers = false) + explicit test_reader_lifecycle_policy(factory_function f, operations_gate& g, bool evict_paused_readers = false) : _factory_function(std::move(f)) + , _operation_gate(g) , _contexts(smp::count) , _evict_paused_readers(evict_paused_readers) { } @@ -61,33 +116,47 @@ public: tracing::trace_state_ptr trace_state, mutation_reader::forwarding fwd_mr) override { const auto shard = this_shard_id(); - _contexts[shard].params = make_foreign(std::make_unique(reader_params{range, slice})); - return _factory_function(std::move(schema), _contexts[shard].params->range, _contexts[shard].params->slice, pc, - std::move(trace_state), fwd_mr); + if (_contexts[shard]) { + _contexts[shard]->range.emplace(range); + _contexts[shard]->slice.emplace(slice); + } else { + _contexts[shard] = make_foreign(std::make_unique(range, slice)); + } + _contexts[shard]->op = _operation_gate.enter(); + return _factory_function(std::move(schema), *_contexts[shard]->range, *_contexts[shard]->slice, pc, std::move(trace_state), fwd_mr); } virtual void destroy_reader(shard_id shard, future reader) noexcept override { - // Move to the background. + // Move to the background, waited via _operation_gate (void)reader.then([shard, this] (stopped_reader&& reader) { return smp::submit_to(shard, [handle = std::move(reader.handle), ctx = std::move(_contexts[shard])] () mutable { - ctx.semaphore->unregister_inactive_read(std::move(*handle)); + ctx->semaphore->unregister_inactive_read(std::move(*handle)); + ctx->semaphore->broken(std::make_exception_ptr(broken_semaphore{})); + if (ctx->wait_future) { + return ctx->wait_future->then_wrapped([ctx = std::move(ctx)] (future f) mutable { + f.ignore_ready_future(); + ctx->permit.reset(); // make sure it's destroyed before the semaphore + }); + } + return make_ready_future<>(); }); }).finally([zis = shared_from_this()] {}); } virtual reader_concurrency_semaphore& semaphore() override { const auto shard = this_shard_id(); - if (!_contexts[shard].semaphore) { + if (!_contexts[shard]->semaphore) { if (_evict_paused_readers) { - _contexts[shard].semaphore = make_foreign(std::make_unique(0, std::numeric_limits::max(), - format("reader_concurrency_semaphore @shard_id={}", shard))); - // Add a waiter, so that all registered inactive reads are - // immediately evicted. - // We don't care about the returned future. - (void)_contexts[shard].semaphore->wait_admission(1, db::no_timeout); + _contexts[shard]->semaphore = std::make_unique(0, std::numeric_limits::max(), + format("reader_concurrency_semaphore @shard_id={}", shard)); + _contexts[shard]->permit = _contexts[shard]->semaphore->make_permit(); + // Add a waiter, so that all registered inactive reads are + // immediately evicted. + // We don't care about the returned future. + _contexts[shard]->wait_future = _contexts[shard]->permit->wait_admission(1, db::no_timeout); } else { - _contexts[shard].semaphore = make_foreign(std::make_unique(reader_concurrency_semaphore::no_limits{})); + _contexts[shard]->semaphore = std::make_unique(reader_concurrency_semaphore::no_limits{}); } } - return *_contexts[shard].semaphore; + return *_contexts[shard]->semaphore; } }; diff --git a/test/manual/sstable_scan_footprint_test.cc b/test/manual/sstable_scan_footprint_test.cc index f7b107a444..b640580c94 100644 --- a/test/manual/sstable_scan_footprint_test.cc +++ b/test/manual/sstable_scan_footprint_test.cc @@ -186,7 +186,8 @@ void execute_reads(reader_concurrency_semaphore& sem, unsigned reads, unsigned c if (sem.waiters()) { testlog.trace("Waiting for queue to drain"); - sem.wait_admission(1, db::no_timeout).get(); + auto permit = sem.make_permit(); + permit.wait_admission(1, db::no_timeout).get(); } }