From a08467da29dde5753c77a40eb65e6b60eb0ef120 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 18 May 2020 12:25:20 +0300 Subject: [PATCH] test: move away from reader_concurrency_semaphore::wait_admission() And use the reader_permit for this instead. This refactoring has revealed a pre-existing bug in the `test_lifecycle_policy`, which is also addressed in this patch. The bug is that said policy executes reader destructions in the background, and these are not waited for. For some reason, the semaphore -> permit transition pushes these races over the edge and we start seeing some of these destruction fibers still being unfinished when test scopes are exited, causing all sorts of trouble. The solution is to introduce a special gate that tests can use to wait for all background work to finish, before the test scope is exited. --- ...ombining_reader_as_mutation_source_test.cc | 13 +- test/boost/multishard_mutation_query_test.cc | 4 +- test/boost/mutation_reader_test.cc | 43 ++++--- test/boost/querier_cache_test.cc | 21 ++-- test/lib/reader_lifecycle_policy.hh | 113 ++++++++++++++---- test/manual/sstable_scan_footprint_test.cc | 3 +- 6 files changed, 139 insertions(+), 58 deletions(-) diff --git a/test/boost/multishard_combining_reader_as_mutation_source_test.cc b/test/boost/multishard_combining_reader_as_mutation_source_test.cc index b2d489437f..0b8ee7c24a 100644 --- a/test/boost/multishard_combining_reader_as_mutation_source_test.cc +++ b/test/boost/multishard_combining_reader_as_mutation_source_test.cc @@ -51,10 +51,11 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) { // It has to be a container that does not invalidate pointers std::list keep_alive_sharder; + test_reader_lifecycle_policy::operations_gate operations_gate; - do_with_cql_env([&keep_alive_sharder] (cql_test_env& env) -> future<> { - auto make_populate = [&keep_alive_sharder, &env] (bool evict_paused_readers, bool single_fragment_buffer) { - return [&keep_alive_sharder, &env, evict_paused_readers, single_fragment_buffer] (schema_ptr s, const std::vector& mutations) mutable { + do_with_cql_env([&] (cql_test_env& env) -> future<> { + auto make_populate = [&] (bool evict_paused_readers, bool single_fragment_buffer) { + return [&, evict_paused_readers, single_fragment_buffer] (schema_ptr s, const std::vector& mutations) mutable { // We need to group mutations that have the same token so they land on the same shard. std::map> mutations_by_token; @@ -84,7 +85,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) { } keep_alive_sharder.push_back(sharder); - return mutation_source([&keep_alive_sharder, remote_memtables, evict_paused_readers, single_fragment_buffer] (schema_ptr s, + return mutation_source([&, remote_memtables, evict_paused_readers, single_fragment_buffer] (schema_ptr s, reader_permit, const dht::partition_range& range, const query::partition_slice& slice, @@ -107,7 +108,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) { return reader; }; - auto lifecycle_policy = seastar::make_shared(std::move(factory), evict_paused_readers); + auto lifecycle_policy = seastar::make_shared(std::move(factory), operations_gate, evict_paused_readers); auto mr = make_multishard_combining_reader_for_tests(keep_alive_sharder.back(), std::move(lifecycle_policy), s, range, slice, pc, trace_state, fwd_mr); if (fwd_sm == streamed_mutation::forwarding::yes) { return make_forwardable(std::move(mr)); @@ -126,6 +127,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) { testlog.info("run_mutation_source_tests(evict_readers=true, single_fragment_buffer=true)"); run_mutation_source_tests(make_populate(true, true)); - return make_ready_future<>(); + return operations_gate.close(); }).get(); } diff --git a/test/boost/multishard_mutation_query_test.cc b/test/boost/multishard_mutation_query_test.cc index 32f84bab67..5affa862ff 100644 --- a/test/boost/multishard_mutation_query_test.cc +++ b/test/boost/multishard_mutation_query_test.cc @@ -1025,9 +1025,9 @@ SEASTAR_THREAD_TEST_CASE(fuzzy_test) { auto resources = sem.available_resources(); resources -= reader_concurrency_semaphore::resources{1, 0}; - auto permit = sem.consume_resources(resources); + auto permit = sem.make_permit(); - return run_fuzzy_test_workload(cfg, *db, gs.get(), partitions).finally([permit = std::move(permit)] {}); + return run_fuzzy_test_workload(cfg, *db, gs.get(), partitions).finally([units = permit.consume_resources(resources)] {}); }).handle_exception([seed] (std::exception_ptr e) { testlog.error("Test workload failed with exception {}." " To repeat this particular run, replace the random seed of the test, with that of this run ({})." diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 8863300030..dc817722a8 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -1085,8 +1085,7 @@ class dummy_file_impl : public file_impl { SEASTAR_TEST_CASE(reader_restriction_file_tracking) { return async([&] { reader_concurrency_semaphore semaphore(100, 4 * 1024, get_name()); - // Testing the tracker here, no need to have a base cost. - auto permit = semaphore.wait_admission(0, db::no_timeout).get0(); + auto permit = semaphore.make_permit(); { auto tracked_file = make_tracked_file(file(shared_ptr(make_shared())), permit); @@ -1958,7 +1957,9 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) { return; } - do_with_cql_env([] (cql_test_env& env) -> future<> { + test_reader_lifecycle_policy::operations_gate operations_gate; + + do_with_cql_env([&] (cql_test_env& env) -> future<> { std::vector> shards_touched(smp::count); simple_schema s; auto factory = [&shards_touched] ( @@ -1973,7 +1974,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) { }; assert_that(make_multishard_combining_reader( - seastar::make_shared(std::move(factory)), + seastar::make_shared(std::move(factory), operations_gate), s.schema(), query::full_partition_range, s.schema()->full_slice(), @@ -1984,7 +1985,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) { BOOST_REQUIRE(shards_touched.at(i)); } - return make_ready_future<>(); + return operations_gate.close(); }).get(); } @@ -2179,7 +2180,9 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending return; } - do_with_cql_env([] (cql_test_env& env) -> future<> { + test_reader_lifecycle_policy::operations_gate operations_gate; + + do_with_cql_env([&] (cql_test_env& env) -> future<> { auto remote_controls = std::vector>>(); remote_controls.reserve(smp::count); for (unsigned i = 0; i < smp::count; ++i) { @@ -2222,7 +2225,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending { dummy_sharder sharder(s.schema()->get_sharder(), std::move(pkeys_by_tokens)); - auto reader = make_multishard_combining_reader_for_tests(sharder, seastar::make_shared(std::move(factory)), + auto reader = make_multishard_combining_reader_for_tests(sharder, seastar::make_shared(std::move(factory), operations_gate), s.schema(), query::full_partition_range, s.schema()->full_slice(), service::get_local_sstable_query_read_priority()); reader.fill_buffer(db::no_timeout).get(); BOOST_REQUIRE(reader.is_buffer_full()); @@ -2244,12 +2247,14 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending std::logical_and()).get0(); })); - return make_ready_future<>(); + return operations_gate.close(); }).get(); } SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) { - do_with_cql_env([] (cql_test_env& env) -> future<> { + test_reader_lifecycle_policy::operations_gate operations_gate; + + do_with_cql_env([&] (cql_test_env& env) -> future<> { env.execute_cql("CREATE KEYSPACE multishard_combining_reader_next_partition_ks" " WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1};").get(); env.execute_cql("CREATE TABLE multishard_combining_reader_next_partition_ks.test (pk int, v int, PRIMARY KEY(pk));").get(); @@ -2300,7 +2305,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) { return reader; }; auto reader = make_multishard_combining_reader( - seastar::make_shared(std::move(factory)), + seastar::make_shared(std::move(factory), operations_gate), schema, query::full_partition_range, schema->full_slice(), @@ -2320,7 +2325,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) { } assertions.produces_end_of_stream(); - return make_ready_future<>(); + return operations_gate.close(); }).get(); } @@ -2402,7 +2407,9 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_non_strictly_monotonic BOOST_REQUIRE(mf.as_clustering_row().key().equal(*s.schema(), ckey)); } - do_with_cql_env([=, s = std::move(s)] (cql_test_env& env) mutable -> future<> { + test_reader_lifecycle_policy::operations_gate operations_gate; + + do_with_cql_env([=, &operations_gate, s = std::move(s)] (cql_test_env& env) mutable -> future<> { auto factory = [=, gs = global_simple_schema(s)] ( schema_ptr, const dht::partition_range& range, @@ -2427,14 +2434,14 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_non_strictly_monotonic BOOST_REQUIRE(mut_opt); assert_that(make_multishard_combining_reader( - seastar::make_shared(std::move(factory), true), + seastar::make_shared(std::move(factory), operations_gate, true), s.schema(), query::full_partition_range, s.schema()->full_slice(), service::get_local_sstable_query_read_priority())) .produces_partition(*mut_opt); - return make_ready_future<>(); + return operations_gate.close(); }).get(); } @@ -2448,7 +2455,9 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) { return; } - do_with_cql_env([] (cql_test_env& env) -> future<> { + test_reader_lifecycle_policy::operations_gate operations_gate; + + do_with_cql_env([&] (cql_test_env& env) -> future<> { env.execute_cql("CREATE KEYSPACE multishard_streaming_reader_ks WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1};").get(); env.execute_cql("CREATE TABLE multishard_streaming_reader_ks.test (pk int, v int, PRIMARY KEY(pk));").get(); @@ -2490,7 +2499,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) { streamed_mutation::forwarding::no, fwd_mr); }; auto reference_reader = make_filtering_reader( - make_multishard_combining_reader(seastar::make_shared(std::move(reader_factory)), + make_multishard_combining_reader(seastar::make_shared(std::move(reader_factory), operations_gate), schema, partition_range, schema->full_slice(), service::get_local_sstable_query_read_priority()), [&remote_partitioner] (const dht::decorated_key& pkey) { return remote_partitioner.shard_of(pkey.token()) == 0; @@ -2514,7 +2523,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) { assert_that(tested_muts[i]).is_equal_to(reference_muts[i]); } - return make_ready_future<>(); + return operations_gate.close(); }).get(); } diff --git a/test/boost/querier_cache_test.cc b/test/boost/querier_cache_test.cc index 2e56c17a1b..5513d8fbf3 100644 --- a/test/boost/querier_cache_test.cc +++ b/test/boost/querier_cache_test.cc @@ -677,21 +677,21 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) { db::no_timeout).get(); auto& semaphore = cf.read_concurrency_semaphore(); + auto permit = semaphore.make_permit(); BOOST_CHECK_EQUAL(db.get_querier_cache_stats().resource_based_evictions, 0); // Drain all resources of the semaphore - std::vector permits; const auto resources = semaphore.available_resources(); - permits.reserve(resources.count); - const auto per_permit_memory = resources.memory / resources.count; + const auto per_count_memory = resources.memory / resources.count; - for (int i = 0; i < resources.count; ++i) { - permits.emplace_back(semaphore.wait_admission(per_permit_memory, db::no_timeout).get0()); + auto units = permit.wait_admission(per_count_memory, db::no_timeout).get0(); + for (int i = 0; i < resources.count - 1; ++i) { + units.add(permit.wait_admission(per_count_memory, db::no_timeout).get0()); } BOOST_CHECK_EQUAL(semaphore.available_resources().count, 0); - BOOST_CHECK(semaphore.available_resources().memory < per_permit_memory); + BOOST_CHECK(semaphore.available_resources().memory < per_count_memory); auto cmd2 = query::read_command(s->id(), s->version(), @@ -749,12 +749,13 @@ SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) { test_querier_cache t; auto& sem = t.get_semaphore(); + auto permit = sem.make_permit(); - auto permit1 = sem.consume_resources(reader_concurrency_semaphore::resources(sem.available_resources().count, 0)); + auto resources = permit.consume_resources(reader_resources(sem.available_resources().count, 0)); BOOST_CHECK_EQUAL(sem.available_resources().count, 0); - auto permit2_fut = sem.wait_admission(1, db::no_timeout); + auto fut = permit.wait_admission(1, db::no_timeout); BOOST_CHECK_EQUAL(sem.waiters(), 1); @@ -764,7 +765,7 @@ SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) { .no_drops() .resource_based_evictions(); - permit1.release(); + resources.reset(); - permit2_fut.get(); + fut.get(); } diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index 1bba4a0b50..bddf51fef7 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -26,6 +26,55 @@ class test_reader_lifecycle_policy : public reader_lifecycle_policy , public enable_shared_from_this { +public: + class operations_gate { + public: + class operation { + gate* _g = nullptr; + + private: + void leave() { + if (_g) { + _g->leave(); + } + } + + public: + operation() = default; + explicit operation(gate& g) : _g(&g) { _g->enter(); } + operation(const operation&) = delete; + operation(operation&& o) : _g(std::exchange(o._g, nullptr)) { } + ~operation() { leave(); } + operation& operator=(const operation&) = delete; + operation& operator=(operation&& o) { + leave(); + _g = std::exchange(o._g, nullptr); + return *this; + } + }; + + private: + std::vector _gates; + + public: + operations_gate() + : _gates(smp::count) { + } + + operation enter() { + return operation(_gates[this_shard_id()]); + } + + future<> close() { + return parallel_for_each(boost::irange(smp::count), [this] (shard_id shard) { + return smp::submit_to(shard, [this, shard] { + return _gates[shard].close(); + }); + }); + } + }; + +private: using factory_function = std::function; - struct reader_params { - const dht::partition_range range; - const query::partition_slice slice; - }; struct reader_context { - foreign_ptr> semaphore; - foreign_ptr> params; + std::unique_ptr semaphore; + operations_gate::operation op; + std::optional permit; + std::optional> wait_future; + std::optional range; + std::optional slice; + + reader_context(dht::partition_range range, query::partition_slice slice) : range(std::move(range)), slice(std::move(slice)) { + } }; factory_function _factory_function; - std::vector _contexts; + operations_gate& _operation_gate; + std::vector>> _contexts; + std::vector> _destroy_futures; bool _evict_paused_readers = false; public: - explicit test_reader_lifecycle_policy(factory_function f, bool evict_paused_readers = false) + explicit test_reader_lifecycle_policy(factory_function f, operations_gate& g, bool evict_paused_readers = false) : _factory_function(std::move(f)) + , _operation_gate(g) , _contexts(smp::count) , _evict_paused_readers(evict_paused_readers) { } @@ -61,33 +116,47 @@ public: tracing::trace_state_ptr trace_state, mutation_reader::forwarding fwd_mr) override { const auto shard = this_shard_id(); - _contexts[shard].params = make_foreign(std::make_unique(reader_params{range, slice})); - return _factory_function(std::move(schema), _contexts[shard].params->range, _contexts[shard].params->slice, pc, - std::move(trace_state), fwd_mr); + if (_contexts[shard]) { + _contexts[shard]->range.emplace(range); + _contexts[shard]->slice.emplace(slice); + } else { + _contexts[shard] = make_foreign(std::make_unique(range, slice)); + } + _contexts[shard]->op = _operation_gate.enter(); + return _factory_function(std::move(schema), *_contexts[shard]->range, *_contexts[shard]->slice, pc, std::move(trace_state), fwd_mr); } virtual void destroy_reader(shard_id shard, future reader) noexcept override { - // Move to the background. + // Move to the background, waited via _operation_gate (void)reader.then([shard, this] (stopped_reader&& reader) { return smp::submit_to(shard, [handle = std::move(reader.handle), ctx = std::move(_contexts[shard])] () mutable { - ctx.semaphore->unregister_inactive_read(std::move(*handle)); + ctx->semaphore->unregister_inactive_read(std::move(*handle)); + ctx->semaphore->broken(std::make_exception_ptr(broken_semaphore{})); + if (ctx->wait_future) { + return ctx->wait_future->then_wrapped([ctx = std::move(ctx)] (future f) mutable { + f.ignore_ready_future(); + ctx->permit.reset(); // make sure it's destroyed before the semaphore + }); + } + return make_ready_future<>(); }); }).finally([zis = shared_from_this()] {}); } virtual reader_concurrency_semaphore& semaphore() override { const auto shard = this_shard_id(); - if (!_contexts[shard].semaphore) { + if (!_contexts[shard]->semaphore) { if (_evict_paused_readers) { - _contexts[shard].semaphore = make_foreign(std::make_unique(0, std::numeric_limits::max(), - format("reader_concurrency_semaphore @shard_id={}", shard))); - // Add a waiter, so that all registered inactive reads are - // immediately evicted. - // We don't care about the returned future. - (void)_contexts[shard].semaphore->wait_admission(1, db::no_timeout); + _contexts[shard]->semaphore = std::make_unique(0, std::numeric_limits::max(), + format("reader_concurrency_semaphore @shard_id={}", shard)); + _contexts[shard]->permit = _contexts[shard]->semaphore->make_permit(); + // Add a waiter, so that all registered inactive reads are + // immediately evicted. + // We don't care about the returned future. + _contexts[shard]->wait_future = _contexts[shard]->permit->wait_admission(1, db::no_timeout); } else { - _contexts[shard].semaphore = make_foreign(std::make_unique(reader_concurrency_semaphore::no_limits{})); + _contexts[shard]->semaphore = std::make_unique(reader_concurrency_semaphore::no_limits{}); } } - return *_contexts[shard].semaphore; + return *_contexts[shard]->semaphore; } }; diff --git a/test/manual/sstable_scan_footprint_test.cc b/test/manual/sstable_scan_footprint_test.cc index f7b107a444..b640580c94 100644 --- a/test/manual/sstable_scan_footprint_test.cc +++ b/test/manual/sstable_scan_footprint_test.cc @@ -186,7 +186,8 @@ void execute_reads(reader_concurrency_semaphore& sem, unsigned reads, unsigned c if (sem.waiters()) { testlog.trace("Waiting for queue to drain"); - sem.wait_admission(1, db::no_timeout).get(); + auto permit = sem.make_permit(); + permit.wait_admission(1, db::no_timeout).get(); } }