test: move away from reader_concurrency_semaphore::wait_admission()

And use the reader_permit for this instead. This refactoring has
revealed a pre-existing bug in the `test_lifecycle_policy`, which is
also addressed in this patch. The bug is that said policy executes
reader destructions in the background, and these are not waited for. For
some reason, the semaphore -> permit transition pushes these races over
the edge and we start seeing some of these destruction fibers still
being unfinished when test scopes are exited, causing all sorts of
trouble. The solution is to introduce a special gate that tests can use
to wait for all background work to finish, before the test scope is
exited.
This commit is contained in:
Botond Dénes
2020-05-18 12:25:20 +03:00
parent bf4ade8917
commit a08467da29
6 changed files with 139 additions and 58 deletions

View File

@@ -51,10 +51,11 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
// It has to be a container that does not invalidate pointers
std::list<dummy_sharder> keep_alive_sharder;
test_reader_lifecycle_policy::operations_gate operations_gate;
do_with_cql_env([&keep_alive_sharder] (cql_test_env& env) -> future<> {
auto make_populate = [&keep_alive_sharder, &env] (bool evict_paused_readers, bool single_fragment_buffer) {
return [&keep_alive_sharder, &env, evict_paused_readers, single_fragment_buffer] (schema_ptr s, const std::vector<mutation>& mutations) mutable {
do_with_cql_env([&] (cql_test_env& env) -> future<> {
auto make_populate = [&] (bool evict_paused_readers, bool single_fragment_buffer) {
return [&, evict_paused_readers, single_fragment_buffer] (schema_ptr s, const std::vector<mutation>& mutations) mutable {
// We need to group mutations that have the same token so they land on the same shard.
std::map<dht::token, std::vector<frozen_mutation>> mutations_by_token;
@@ -84,7 +85,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
}
keep_alive_sharder.push_back(sharder);
return mutation_source([&keep_alive_sharder, remote_memtables, evict_paused_readers, single_fragment_buffer] (schema_ptr s,
return mutation_source([&, remote_memtables, evict_paused_readers, single_fragment_buffer] (schema_ptr s,
reader_permit,
const dht::partition_range& range,
const query::partition_slice& slice,
@@ -107,7 +108,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
return reader;
};
auto lifecycle_policy = seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), evict_paused_readers);
auto lifecycle_policy = seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), operations_gate, evict_paused_readers);
auto mr = make_multishard_combining_reader_for_tests(keep_alive_sharder.back(), std::move(lifecycle_policy), s, range, slice, pc, trace_state, fwd_mr);
if (fwd_sm == streamed_mutation::forwarding::yes) {
return make_forwardable(std::move(mr));
@@ -126,6 +127,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
testlog.info("run_mutation_source_tests(evict_readers=true, single_fragment_buffer=true)");
run_mutation_source_tests(make_populate(true, true));
return make_ready_future<>();
return operations_gate.close();
}).get();
}

View File

@@ -1025,9 +1025,9 @@ SEASTAR_THREAD_TEST_CASE(fuzzy_test) {
auto resources = sem.available_resources();
resources -= reader_concurrency_semaphore::resources{1, 0};
auto permit = sem.consume_resources(resources);
auto permit = sem.make_permit();
return run_fuzzy_test_workload(cfg, *db, gs.get(), partitions).finally([permit = std::move(permit)] {});
return run_fuzzy_test_workload(cfg, *db, gs.get(), partitions).finally([units = permit.consume_resources(resources)] {});
}).handle_exception([seed] (std::exception_ptr e) {
testlog.error("Test workload failed with exception {}."
" To repeat this particular run, replace the random seed of the test, with that of this run ({})."

View File

@@ -1085,8 +1085,7 @@ class dummy_file_impl : public file_impl {
SEASTAR_TEST_CASE(reader_restriction_file_tracking) {
return async([&] {
reader_concurrency_semaphore semaphore(100, 4 * 1024, get_name());
// Testing the tracker here, no need to have a base cost.
auto permit = semaphore.wait_admission(0, db::no_timeout).get0();
auto permit = semaphore.make_permit();
{
auto tracked_file = make_tracked_file(file(shared_ptr<file_impl>(make_shared<dummy_file_impl>())), permit);
@@ -1958,7 +1957,9 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) {
return;
}
do_with_cql_env([] (cql_test_env& env) -> future<> {
test_reader_lifecycle_policy::operations_gate operations_gate;
do_with_cql_env([&] (cql_test_env& env) -> future<> {
std::vector<std::atomic<bool>> shards_touched(smp::count);
simple_schema s;
auto factory = [&shards_touched] (
@@ -1973,7 +1974,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) {
};
assert_that(make_multishard_combining_reader(
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory)),
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), operations_gate),
s.schema(),
query::full_partition_range,
s.schema()->full_slice(),
@@ -1984,7 +1985,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) {
BOOST_REQUIRE(shards_touched.at(i));
}
return make_ready_future<>();
return operations_gate.close();
}).get();
}
@@ -2179,7 +2180,9 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending
return;
}
do_with_cql_env([] (cql_test_env& env) -> future<> {
test_reader_lifecycle_policy::operations_gate operations_gate;
do_with_cql_env([&] (cql_test_env& env) -> future<> {
auto remote_controls = std::vector<foreign_ptr<std::unique_ptr<puppet_reader::control>>>();
remote_controls.reserve(smp::count);
for (unsigned i = 0; i < smp::count; ++i) {
@@ -2222,7 +2225,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending
{
dummy_sharder sharder(s.schema()->get_sharder(), std::move(pkeys_by_tokens));
auto reader = make_multishard_combining_reader_for_tests(sharder, seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory)),
auto reader = make_multishard_combining_reader_for_tests(sharder, seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), operations_gate),
s.schema(), query::full_partition_range, s.schema()->full_slice(), service::get_local_sstable_query_read_priority());
reader.fill_buffer(db::no_timeout).get();
BOOST_REQUIRE(reader.is_buffer_full());
@@ -2244,12 +2247,14 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending
std::logical_and<bool>()).get0();
}));
return make_ready_future<>();
return operations_gate.close();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) {
do_with_cql_env([] (cql_test_env& env) -> future<> {
test_reader_lifecycle_policy::operations_gate operations_gate;
do_with_cql_env([&] (cql_test_env& env) -> future<> {
env.execute_cql("CREATE KEYSPACE multishard_combining_reader_next_partition_ks"
" WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1};").get();
env.execute_cql("CREATE TABLE multishard_combining_reader_next_partition_ks.test (pk int, v int, PRIMARY KEY(pk));").get();
@@ -2300,7 +2305,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) {
return reader;
};
auto reader = make_multishard_combining_reader(
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory)),
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), operations_gate),
schema,
query::full_partition_range,
schema->full_slice(),
@@ -2320,7 +2325,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) {
}
assertions.produces_end_of_stream();
return make_ready_future<>();
return operations_gate.close();
}).get();
}
@@ -2402,7 +2407,9 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_non_strictly_monotonic
BOOST_REQUIRE(mf.as_clustering_row().key().equal(*s.schema(), ckey));
}
do_with_cql_env([=, s = std::move(s)] (cql_test_env& env) mutable -> future<> {
test_reader_lifecycle_policy::operations_gate operations_gate;
do_with_cql_env([=, &operations_gate, s = std::move(s)] (cql_test_env& env) mutable -> future<> {
auto factory = [=, gs = global_simple_schema(s)] (
schema_ptr,
const dht::partition_range& range,
@@ -2427,14 +2434,14 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_non_strictly_monotonic
BOOST_REQUIRE(mut_opt);
assert_that(make_multishard_combining_reader(
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), true),
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), operations_gate, true),
s.schema(),
query::full_partition_range,
s.schema()->full_slice(),
service::get_local_sstable_query_read_priority()))
.produces_partition(*mut_opt);
return make_ready_future<>();
return operations_gate.close();
}).get();
}
@@ -2448,7 +2455,9 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) {
return;
}
do_with_cql_env([] (cql_test_env& env) -> future<> {
test_reader_lifecycle_policy::operations_gate operations_gate;
do_with_cql_env([&] (cql_test_env& env) -> future<> {
env.execute_cql("CREATE KEYSPACE multishard_streaming_reader_ks WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1};").get();
env.execute_cql("CREATE TABLE multishard_streaming_reader_ks.test (pk int, v int, PRIMARY KEY(pk));").get();
@@ -2490,7 +2499,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) {
streamed_mutation::forwarding::no, fwd_mr);
};
auto reference_reader = make_filtering_reader(
make_multishard_combining_reader(seastar::make_shared<test_reader_lifecycle_policy>(std::move(reader_factory)),
make_multishard_combining_reader(seastar::make_shared<test_reader_lifecycle_policy>(std::move(reader_factory), operations_gate),
schema, partition_range, schema->full_slice(), service::get_local_sstable_query_read_priority()),
[&remote_partitioner] (const dht::decorated_key& pkey) {
return remote_partitioner.shard_of(pkey.token()) == 0;
@@ -2514,7 +2523,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) {
assert_that(tested_muts[i]).is_equal_to(reference_muts[i]);
}
return make_ready_future<>();
return operations_gate.close();
}).get();
}

View File

@@ -677,21 +677,21 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) {
db::no_timeout).get();
auto& semaphore = cf.read_concurrency_semaphore();
auto permit = semaphore.make_permit();
BOOST_CHECK_EQUAL(db.get_querier_cache_stats().resource_based_evictions, 0);
// Drain all resources of the semaphore
std::vector<reader_permit> permits;
const auto resources = semaphore.available_resources();
permits.reserve(resources.count);
const auto per_permit_memory = resources.memory / resources.count;
const auto per_count_memory = resources.memory / resources.count;
for (int i = 0; i < resources.count; ++i) {
permits.emplace_back(semaphore.wait_admission(per_permit_memory, db::no_timeout).get0());
auto units = permit.wait_admission(per_count_memory, db::no_timeout).get0();
for (int i = 0; i < resources.count - 1; ++i) {
units.add(permit.wait_admission(per_count_memory, db::no_timeout).get0());
}
BOOST_CHECK_EQUAL(semaphore.available_resources().count, 0);
BOOST_CHECK(semaphore.available_resources().memory < per_permit_memory);
BOOST_CHECK(semaphore.available_resources().memory < per_count_memory);
auto cmd2 = query::read_command(s->id(),
s->version(),
@@ -749,12 +749,13 @@ SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) {
test_querier_cache t;
auto& sem = t.get_semaphore();
auto permit = sem.make_permit();
auto permit1 = sem.consume_resources(reader_concurrency_semaphore::resources(sem.available_resources().count, 0));
auto resources = permit.consume_resources(reader_resources(sem.available_resources().count, 0));
BOOST_CHECK_EQUAL(sem.available_resources().count, 0);
auto permit2_fut = sem.wait_admission(1, db::no_timeout);
auto fut = permit.wait_admission(1, db::no_timeout);
BOOST_CHECK_EQUAL(sem.waiters(), 1);
@@ -764,7 +765,7 @@ SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) {
.no_drops()
.resource_based_evictions();
permit1.release();
resources.reset();
permit2_fut.get();
fut.get();
}

View File

@@ -26,6 +26,55 @@
class test_reader_lifecycle_policy
: public reader_lifecycle_policy
, public enable_shared_from_this<test_reader_lifecycle_policy> {
public:
class operations_gate {
public:
class operation {
gate* _g = nullptr;
private:
void leave() {
if (_g) {
_g->leave();
}
}
public:
operation() = default;
explicit operation(gate& g) : _g(&g) { _g->enter(); }
operation(const operation&) = delete;
operation(operation&& o) : _g(std::exchange(o._g, nullptr)) { }
~operation() { leave(); }
operation& operator=(const operation&) = delete;
operation& operator=(operation&& o) {
leave();
_g = std::exchange(o._g, nullptr);
return *this;
}
};
private:
std::vector<gate> _gates;
public:
operations_gate()
: _gates(smp::count) {
}
operation enter() {
return operation(_gates[this_shard_id()]);
}
future<> close() {
return parallel_for_each(boost::irange(smp::count), [this] (shard_id shard) {
return smp::submit_to(shard, [this, shard] {
return _gates[shard].close();
});
});
}
};
private:
using factory_function = std::function<flat_mutation_reader(
schema_ptr,
const dht::partition_range&,
@@ -34,22 +83,28 @@ class test_reader_lifecycle_policy
tracing::trace_state_ptr,
mutation_reader::forwarding)>;
struct reader_params {
const dht::partition_range range;
const query::partition_slice slice;
};
struct reader_context {
foreign_ptr<std::unique_ptr<reader_concurrency_semaphore>> semaphore;
foreign_ptr<std::unique_ptr<const reader_params>> params;
std::unique_ptr<reader_concurrency_semaphore> semaphore;
operations_gate::operation op;
std::optional<reader_permit> permit;
std::optional<future<reader_permit::resource_units>> wait_future;
std::optional<const dht::partition_range> range;
std::optional<const query::partition_slice> slice;
reader_context(dht::partition_range range, query::partition_slice slice) : range(std::move(range)), slice(std::move(slice)) {
}
};
factory_function _factory_function;
std::vector<reader_context> _contexts;
operations_gate& _operation_gate;
std::vector<foreign_ptr<std::unique_ptr<reader_context>>> _contexts;
std::vector<future<>> _destroy_futures;
bool _evict_paused_readers = false;
public:
explicit test_reader_lifecycle_policy(factory_function f, bool evict_paused_readers = false)
explicit test_reader_lifecycle_policy(factory_function f, operations_gate& g, bool evict_paused_readers = false)
: _factory_function(std::move(f))
, _operation_gate(g)
, _contexts(smp::count)
, _evict_paused_readers(evict_paused_readers) {
}
@@ -61,33 +116,47 @@ public:
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr) override {
const auto shard = this_shard_id();
_contexts[shard].params = make_foreign(std::make_unique<const reader_params>(reader_params{range, slice}));
return _factory_function(std::move(schema), _contexts[shard].params->range, _contexts[shard].params->slice, pc,
std::move(trace_state), fwd_mr);
if (_contexts[shard]) {
_contexts[shard]->range.emplace(range);
_contexts[shard]->slice.emplace(slice);
} else {
_contexts[shard] = make_foreign(std::make_unique<reader_context>(range, slice));
}
_contexts[shard]->op = _operation_gate.enter();
return _factory_function(std::move(schema), *_contexts[shard]->range, *_contexts[shard]->slice, pc, std::move(trace_state), fwd_mr);
}
virtual void destroy_reader(shard_id shard, future<stopped_reader> reader) noexcept override {
// Move to the background.
// Move to the background, waited via _operation_gate
(void)reader.then([shard, this] (stopped_reader&& reader) {
return smp::submit_to(shard, [handle = std::move(reader.handle), ctx = std::move(_contexts[shard])] () mutable {
ctx.semaphore->unregister_inactive_read(std::move(*handle));
ctx->semaphore->unregister_inactive_read(std::move(*handle));
ctx->semaphore->broken(std::make_exception_ptr(broken_semaphore{}));
if (ctx->wait_future) {
return ctx->wait_future->then_wrapped([ctx = std::move(ctx)] (future<reader_permit::resource_units> f) mutable {
f.ignore_ready_future();
ctx->permit.reset(); // make sure it's destroyed before the semaphore
});
}
return make_ready_future<>();
});
}).finally([zis = shared_from_this()] {});
}
virtual reader_concurrency_semaphore& semaphore() override {
const auto shard = this_shard_id();
if (!_contexts[shard].semaphore) {
if (!_contexts[shard]->semaphore) {
if (_evict_paused_readers) {
_contexts[shard].semaphore = make_foreign(std::make_unique<reader_concurrency_semaphore>(0, std::numeric_limits<ssize_t>::max(),
format("reader_concurrency_semaphore @shard_id={}", shard)));
// Add a waiter, so that all registered inactive reads are
// immediately evicted.
// We don't care about the returned future.
(void)_contexts[shard].semaphore->wait_admission(1, db::no_timeout);
_contexts[shard]->semaphore = std::make_unique<reader_concurrency_semaphore>(0, std::numeric_limits<ssize_t>::max(),
format("reader_concurrency_semaphore @shard_id={}", shard));
_contexts[shard]->permit = _contexts[shard]->semaphore->make_permit();
// Add a waiter, so that all registered inactive reads are
// immediately evicted.
// We don't care about the returned future.
_contexts[shard]->wait_future = _contexts[shard]->permit->wait_admission(1, db::no_timeout);
} else {
_contexts[shard].semaphore = make_foreign(std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{}));
_contexts[shard]->semaphore = std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{});
}
}
return *_contexts[shard].semaphore;
return *_contexts[shard]->semaphore;
}
};

View File

@@ -186,7 +186,8 @@ void execute_reads(reader_concurrency_semaphore& sem, unsigned reads, unsigned c
if (sem.waiters()) {
testlog.trace("Waiting for queue to drain");
sem.wait_admission(1, db::no_timeout).get();
auto permit = sem.make_permit();
permit.wait_admission(1, db::no_timeout).get();
}
}