Files
scylladb/test/boost/querier_cache_test.cc
Botond Dénes dbe70cddca test/boost/querier_cache_test: make test_time_based_cache_eviction less sensitive to timing
This test relies on the cache entry being evicted after 200ms past the
TTL. This may not happen on a busy CI machine. Make the test less
reliant on timing by using eventually_true().
Simplify the test by dropping the second entry, it doesn't add anything
to the test.

Fixes: SCYLLADB-811

Closes scylladb/scylladb#28958
2026-03-17 10:32:23 +01:00

916 lines
34 KiB
C++

/*
* Copyright (C) 2018-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <algorithm>
#include "replica/querier.hh"
#include "mutation_query.hh"
#include "reader_concurrency_semaphore.hh"
#include "test/lib/simple_schema.hh"
#include "test/lib/cql_test_env.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/exception_utils.hh"
#include "test/lib/eventually.hh"
#include "db/config.hh"
#include <fmt/ranges.h>
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
#undef SEASTAR_TESTING_MAIN
#include <seastar/testing/test_case.hh>
#include <seastar/testing/thread_test_case.hh>
#include <seastar/util/closeable.hh>
#include "readers/from_mutations.hh"
#include "readers/empty.hh"
BOOST_AUTO_TEST_SUITE(querier_cache_test)
using namespace std::chrono_literals;
class dummy_result_builder {
std::optional<dht::decorated_key> _dk;
std::optional<clustering_key_prefix> _ck;
public:
dummy_result_builder()
: _dk({dht::token(), partition_key::make_empty()})
, _ck(clustering_key_prefix::make_empty()) {
}
void consume_new_partition(const dht::decorated_key& dk) {
_dk = dk;
_ck = {};
}
void consume(tombstone t) {
}
stop_iteration consume(static_row&& sr, tombstone t, bool is_live) {
return stop_iteration::no;
}
stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_live) {
_ck = cr.key();
return stop_iteration::no;
}
stop_iteration consume(range_tombstone_change&& rtc) {
return stop_iteration::no;
}
stop_iteration consume_end_of_partition() {
return stop_iteration::no;
}
std::pair<std::optional<dht::decorated_key>, std::optional<clustering_key_prefix>> consume_end_of_stream() {
return {std::move(_dk), std::move(_ck)};
}
};
class test_querier_cache {
public:
using bound = interval_bound<std::size_t>;
static const size_t max_reader_buffer_size = 8 * 1024;
private:
// Expected value of the above counters, updated by this.
replica::querier_cache::stats _expected_stats;
simple_schema _s;
reader_concurrency_semaphore _sem;
replica::querier_cache _cache;
const utils::chunked_vector<mutation> _mutations;
const mutation_source _mutation_source;
static sstring make_value(size_t i) {
return format("value{:010d}", i);
}
static utils::chunked_vector<mutation> make_mutations(simple_schema& s, const noncopyable_function<sstring(size_t)>& make_value) {
utils::chunked_vector<mutation> mutations;
mutations.reserve(10);
for (uint32_t i = 0; i != 10; ++i) {
auto mut = mutation(s.schema(), s.make_pkey(i));
s.add_static_row(mut, "-");
s.add_row(mut, s.make_ckey(0), make_value(0));
s.add_row(mut, s.make_ckey(1), make_value(1));
s.add_row(mut, s.make_ckey(2), make_value(2));
s.add_row(mut, s.make_ckey(3), make_value(3));
mutations.emplace_back(std::move(mut));
}
std::ranges::sort(mutations, [] (const mutation& a, const mutation& b) {
return a.decorated_key().tri_compare(*a.schema(), b.decorated_key()) < 0;
});
return mutations;
}
template <typename Querier>
Querier make_querier(const dht::partition_range& range, db::timeout_clock::time_point timeout) {
return Querier(_mutation_source,
_s.schema(),
_sem.make_tracking_only_permit(_s.schema(), "make-querier", timeout, {}),
range,
_s.schema()->full_slice(),
nullptr,
tombstone_gc_state::no_gc());
}
static query_id make_cache_key(unsigned key) {
return query_id(utils::UUID{key, 1});
}
const dht::decorated_key* find_key(const dht::partition_range& range, unsigned partition_offset) const {
const auto& s = *_s.schema();
const auto less_cmp = dht::ring_position_less_comparator(s);
const auto begin = _mutations.begin();
const auto end = _mutations.end();
const auto start_position = range.start() ?
dht::ring_position_view::for_range_start(range) :
dht::ring_position_view(_mutations.begin()->decorated_key());
const auto it = std::lower_bound(begin, end, start_position, [&] (const mutation& m, const dht::ring_position_view& k) {
return less_cmp(m.ring_position(), k);
});
if (it == end) {
return nullptr;
}
const auto dist = std::distance(it, end);
auto& mut = *(partition_offset >= dist ? it + dist : it + partition_offset);
return &mut.decorated_key();
}
public:
struct entry_info {
reader_permit permit;
unsigned key;
dht::partition_range original_range;
query::partition_slice original_slice;
uint64_t row_limit;
dht::partition_range expected_range;
query::partition_slice expected_slice;
};
test_querier_cache(const noncopyable_function<sstring(size_t)>& external_make_value, std::chrono::seconds entry_ttl = 24h,
ssize_t max_memory = std::numeric_limits<ssize_t>::max(), replica::querier_cache::is_user_semaphore_func is_user_semaphore = {})
: _sem(reader_concurrency_semaphore::for_tests{}, "test_querier_cache", std::numeric_limits<int>::max(), max_memory)
, _cache(is_user_semaphore ? std::move(is_user_semaphore) : [] (const reader_concurrency_semaphore&) { return true; }, entry_ttl)
, _mutations(make_mutations(_s, external_make_value))
, _mutation_source([this] (schema_ptr schema, reader_permit permit, const dht::partition_range& range) {
auto rd = make_mutation_reader_from_mutations(schema, std::move(permit), _mutations, range);
rd.set_max_buffer_size(max_reader_buffer_size);
return rd;
}) {
}
explicit test_querier_cache(std::chrono::seconds entry_ttl = 24h)
: test_querier_cache(test_querier_cache::make_value, entry_ttl) {
}
test_querier_cache(replica::querier_cache::is_user_semaphore_func is_user_semaphore)
: test_querier_cache(test_querier_cache::make_value, 24h, std::numeric_limits<ssize_t>::max(), std::move(is_user_semaphore))
{ }
~test_querier_cache() {
_cache.stop().get();
_sem.stop().get();
}
const simple_schema& get_simple_schema() const {
return _s;
}
simple_schema& get_simple_schema() {
return _s;
}
const schema_ptr get_schema() const {
return _s.schema();
}
reader_concurrency_semaphore& get_semaphore() {
return _sem;
}
const replica::querier_cache::stats& get_stats() const {
return _cache.get_stats();
}
dht::partition_range make_partition_range(bound begin, bound end) const {
return dht::partition_range::make({_mutations.at(begin.value()).decorated_key(), begin.is_inclusive()},
{_mutations.at(end.value()).decorated_key(), end.is_inclusive()});
}
dht::partition_range make_singular_partition_range(std::size_t i) const {
return dht::partition_range::make_singular(_mutations.at(i).decorated_key());
}
dht::partition_range make_default_partition_range() const {
return make_partition_range({0, true}, {_mutations.size() - 1, true});
}
const query::partition_slice& make_default_slice() const {
return _s.schema()->full_slice();
}
template <typename Querier>
entry_info produce_first_page_and_save_querier(void(replica::querier_cache::*insert_mem_ptr)(query_id, Querier&&, tracing::trace_state_ptr), unsigned key,
const dht::partition_range& range, const query::partition_slice& slice, uint64_t row_limit, db::timeout_clock::time_point timeout = db::no_timeout) {
const auto cache_key = make_cache_key(key);
auto querier = make_querier<Querier>(range, timeout);
auto dk_ck = querier.consume_page(dummy_result_builder{}, row_limit, std::numeric_limits<uint32_t>::max(), gc_clock::now()).get();
auto&& dk = dk_ck.first;
auto&& ck = dk_ck.second;
auto permit = querier.permit();
auto insert_fn = std::mem_fn(insert_mem_ptr);
insert_fn(_cache, cache_key, std::move(querier), nullptr);
// Either no keys at all (nothing read) or at least partition key.
BOOST_REQUIRE((dk && ck) || !ck);
// Check that the read stopped at the correct position.
// There are 5 rows in each mutation (1 static + 4 clustering).
const auto* expected_key = find_key(range, row_limit / 5);
if (!expected_key) {
BOOST_REQUIRE(!dk);
BOOST_REQUIRE(!ck);
} else {
BOOST_REQUIRE(dk->equal(*_s.schema(), *expected_key));
}
auto expected_range = [&] {
if (range.is_singular() || !dk) {
return range;
}
return dht::partition_range(dht::partition_range::bound(*dk, true), range.end());
}();
auto expected_slice = [&] {
if (!ck) {
return slice;
}
auto expected_slice = slice;
auto cr = query::clustering_range::make_starting_with({*ck, false});
expected_slice.set_range(*_s.schema(), dk->key(), {std::move(cr)});
return expected_slice;
}();
return {std::move(permit), key, std::move(range), std::move(slice), row_limit, std::move(expected_range), std::move(expected_slice)};
}
entry_info produce_first_page_and_save_data_querier(unsigned key, const dht::partition_range& range,
const query::partition_slice& slice, uint64_t row_limit = 5) {
return produce_first_page_and_save_querier<replica::querier>(&replica::querier_cache::insert_data_querier, key, range, slice, row_limit);
}
entry_info produce_first_page_and_save_data_querier(unsigned key, const dht::partition_range& range, uint64_t row_limit = 5) {
return produce_first_page_and_save_data_querier(key, range, make_default_slice(), row_limit);
}
// Singular overload
entry_info produce_first_page_and_save_data_querier(unsigned key, std::size_t i, uint64_t row_limit = 5) {
return produce_first_page_and_save_data_querier(key, make_singular_partition_range(i), _s.schema()->full_slice(), row_limit);
}
// Use the whole range
entry_info produce_first_page_and_save_data_querier(unsigned key) {
return produce_first_page_and_save_data_querier(key, make_default_partition_range(), _s.schema()->full_slice());
}
// For tests testing just one insert-lookup.
entry_info produce_first_page_and_save_data_querier() {
return produce_first_page_and_save_data_querier(1);
}
entry_info produce_first_page_and_save_mutation_querier(unsigned key, const dht::partition_range& range,
const query::partition_slice& slice, uint64_t row_limit = 5, db::timeout_clock::time_point timeout = db::no_timeout) {
return produce_first_page_and_save_querier<replica::querier>(&replica::querier_cache::insert_mutation_querier, key, range, slice, row_limit, timeout);
}
entry_info produce_first_page_and_save_mutation_querier(unsigned key, const dht::partition_range& range, uint64_t row_limit = 5,
db::timeout_clock::time_point timeout = db::no_timeout) {
return produce_first_page_and_save_mutation_querier(key, range, make_default_slice(), row_limit, timeout);
}
// Singular overload
entry_info produce_first_page_and_save_mutation_querier(unsigned key, std::size_t i, uint64_t row_limit = 5,
db::timeout_clock::time_point timeout = db::no_timeout) {
return produce_first_page_and_save_mutation_querier(key, make_singular_partition_range(i), _s.schema()->full_slice(), row_limit);
}
// Use the whole range
entry_info produce_first_page_and_save_mutation_querier(unsigned key, db::timeout_clock::time_point timeout = db::no_timeout) {
return produce_first_page_and_save_mutation_querier(key, make_default_partition_range(), _s.schema()->full_slice(), 5, timeout);
}
// For tests testing just one insert-lookup.
entry_info produce_first_page_and_save_mutation_querier(db::timeout_clock::time_point timeout = db::no_timeout) {
return produce_first_page_and_save_mutation_querier(1, timeout);
}
test_querier_cache& assert_cache_lookup_data_querier(unsigned lookup_key,
const schema& lookup_schema,
const dht::partition_range& lookup_range,
const query::partition_slice& lookup_slice,
reader_concurrency_semaphore& sem) {
auto querier_opt = _cache.lookup_data_querier(make_cache_key(lookup_key), lookup_schema, lookup_range, lookup_slice, sem, nullptr, db::no_timeout);
if (querier_opt) {
querier_opt->close().get();
}
BOOST_REQUIRE_EQUAL(_cache.get_stats().lookups, ++_expected_stats.lookups);
return *this;
}
test_querier_cache& assert_cache_lookup_data_querier(unsigned lookup_key,
const schema& lookup_schema,
const dht::partition_range& lookup_range,
const query::partition_slice& lookup_slice) {
return assert_cache_lookup_data_querier(lookup_key, lookup_schema, lookup_range, lookup_slice, get_semaphore());
}
test_querier_cache& assert_cache_lookup_mutation_querier(unsigned lookup_key,
const schema& lookup_schema,
const dht::partition_range& lookup_range,
const query::partition_slice& lookup_slice,
db::timeout_clock::time_point timeout = db::no_timeout) {
auto querier_opt = _cache.lookup_mutation_querier(make_cache_key(lookup_key), lookup_schema, lookup_range, lookup_slice, get_semaphore(), nullptr, timeout);
if (querier_opt) {
querier_opt->close().get();
}
BOOST_REQUIRE_EQUAL(_cache.get_stats().lookups, ++_expected_stats.lookups);
return *this;
}
test_querier_cache& no_misses() {
BOOST_REQUIRE_EQUAL(_cache.get_stats().misses, _expected_stats.misses);
return *this;
}
test_querier_cache& misses() {
BOOST_REQUIRE_EQUAL(_cache.get_stats().misses, ++_expected_stats.misses);
return *this;
}
test_querier_cache& no_drops() {
BOOST_REQUIRE_EQUAL(_cache.get_stats().drops, _expected_stats.drops);
return *this;
}
test_querier_cache& drops() {
BOOST_REQUIRE_EQUAL(_cache.get_stats().drops, ++_expected_stats.drops);
return *this;
}
test_querier_cache& no_evictions() {
BOOST_REQUIRE_EQUAL(_cache.get_stats().time_based_evictions, _expected_stats.time_based_evictions);
BOOST_REQUIRE_EQUAL(_cache.get_stats().resource_based_evictions, _expected_stats.resource_based_evictions);
return *this;
}
test_querier_cache& time_based_evictions() {
BOOST_REQUIRE_EQUAL(_cache.get_stats().time_based_evictions, ++_expected_stats.time_based_evictions);
BOOST_REQUIRE_EQUAL(_cache.get_stats().resource_based_evictions, _expected_stats.resource_based_evictions);
return *this;
}
test_querier_cache& resource_based_evictions() {
BOOST_REQUIRE_EQUAL(_cache.get_stats().time_based_evictions, _expected_stats.time_based_evictions);
BOOST_REQUIRE_EQUAL(_cache.get_stats().resource_based_evictions, ++_expected_stats.resource_based_evictions);
return *this;
}
};
SEASTAR_THREAD_TEST_CASE(lookup_with_wrong_key_misses) {
test_querier_cache t;
const auto entry = t.produce_first_page_and_save_data_querier();
t.assert_cache_lookup_data_querier(90, *t.get_schema(), entry.expected_range, entry.expected_slice)
.misses()
.no_drops()
.no_evictions();
}
SEASTAR_THREAD_TEST_CASE(lookup_data_querier_as_mutation_querier_misses) {
test_querier_cache t;
const auto entry = t.produce_first_page_and_save_data_querier();
t.assert_cache_lookup_mutation_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice)
.misses()
.no_drops()
.no_evictions();
t.assert_cache_lookup_data_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice)
.no_misses()
.no_drops()
.no_evictions();
}
SEASTAR_THREAD_TEST_CASE(lookup_mutation_querier_as_data_querier_misses) {
test_querier_cache t;
const auto entry = t.produce_first_page_and_save_mutation_querier();
t.assert_cache_lookup_data_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice)
.misses()
.no_drops()
.no_evictions();
t.assert_cache_lookup_mutation_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice)
.no_misses()
.no_drops()
.no_evictions();
}
SEASTAR_THREAD_TEST_CASE(data_and_mutation_querier_can_coexist) {
test_querier_cache t;
const auto data_entry = t.produce_first_page_and_save_data_querier(1);
const auto mutation_entry = t.produce_first_page_and_save_mutation_querier(1);
t.assert_cache_lookup_data_querier(data_entry.key, *t.get_schema(), data_entry.expected_range, data_entry.expected_slice)
.no_misses()
.no_drops()
.no_evictions();
t.assert_cache_lookup_mutation_querier(mutation_entry.key, *t.get_schema(), mutation_entry.expected_range, mutation_entry.expected_slice)
.no_misses()
.no_drops()
.no_evictions();
}
/*
* Range matching tests
*/
SEASTAR_THREAD_TEST_CASE(singular_range_lookup_with_stop_at_clustering_row) {
test_querier_cache t;
const auto entry = t.produce_first_page_and_save_data_querier(1, t.make_singular_partition_range(1), 2);
t.assert_cache_lookup_data_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice)
.no_misses()
.no_drops()
.no_evictions();
}
SEASTAR_THREAD_TEST_CASE(singular_range_lookup_with_stop_at_static_row) {
test_querier_cache t;
const auto entry = t.produce_first_page_and_save_data_querier(1, t.make_singular_partition_range(1), 1);
t.assert_cache_lookup_data_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice)
.no_misses()
.no_drops()
.no_evictions();
}
SEASTAR_THREAD_TEST_CASE(lookup_with_stop_at_clustering_row) {
test_querier_cache t;
const auto entry = t.produce_first_page_and_save_data_querier(1, t.make_partition_range({1, true}, {3, false}), 3);
t.assert_cache_lookup_data_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice)
.no_misses()
.no_drops()
.no_evictions();
}
SEASTAR_THREAD_TEST_CASE(lookup_with_stop_at_static_row) {
test_querier_cache t;
const auto entry = t.produce_first_page_and_save_data_querier(1, t.make_partition_range({1, true}, {3, false}), 1);
t.assert_cache_lookup_data_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice)
.no_misses()
.no_drops()
.no_evictions();
}
/*
* Drop tests
*/
SEASTAR_THREAD_TEST_CASE(lookup_with_original_range_drops) {
test_querier_cache t;
const auto entry = t.produce_first_page_and_save_data_querier(1);
t.assert_cache_lookup_data_querier(entry.key, *t.get_schema(), entry.original_range, entry.expected_slice)
.no_misses()
.drops()
.no_evictions();
}
SEASTAR_THREAD_TEST_CASE(lookup_with_wrong_slice_drops) {
test_querier_cache t;
// Swap slices for different clustering keys.
const auto entry1 = t.produce_first_page_and_save_data_querier(1, t.make_partition_range({1, false}, {3, true}), 3);
const auto entry2 = t.produce_first_page_and_save_data_querier(2, t.make_partition_range({1, false}, {3, true}), 4);
t.assert_cache_lookup_data_querier(entry1.key, *t.get_schema(), entry1.expected_range, entry2.expected_slice)
.no_misses()
.drops()
.no_evictions();
t.assert_cache_lookup_data_querier(entry2.key, *t.get_schema(), entry2.expected_range, entry1.expected_slice)
.no_misses()
.drops()
.no_evictions();
// Wrong slice.
const auto entry3 = t.produce_first_page_and_save_data_querier(3);
t.assert_cache_lookup_data_querier(entry3.key, *t.get_schema(), entry3.expected_range, t.get_schema()->full_slice())
.no_misses()
.drops()
.no_evictions();
// Swap slices for stopped at clustering/static row.
const auto entry4 = t.produce_first_page_and_save_data_querier(4, t.make_partition_range({1, false}, {3, true}), 1);
const auto entry5 = t.produce_first_page_and_save_data_querier(5, t.make_partition_range({1, false}, {3, true}), 2);
t.assert_cache_lookup_data_querier(entry4.key, *t.get_schema(), entry4.expected_range, entry5.expected_slice)
.no_misses()
.drops()
.no_evictions();
t.assert_cache_lookup_data_querier(entry5.key, *t.get_schema(), entry5.expected_range, entry4.expected_slice)
.no_misses()
.drops()
.no_evictions();
}
SEASTAR_THREAD_TEST_CASE(lookup_with_different_schema_version_drops) {
test_querier_cache t;
auto new_schema = schema_builder(t.get_schema()).with_column("v1", utf8_type).build();
const auto entry = t.produce_first_page_and_save_data_querier();
t.assert_cache_lookup_data_querier(entry.key, *new_schema, entry.expected_range, entry.expected_slice)
.no_misses()
.drops()
.no_evictions();
}
/*
* Eviction tests
*/
SEASTAR_THREAD_TEST_CASE(test_time_based_cache_eviction) {
test_querier_cache t(1s);
const auto entry1 = t.produce_first_page_and_save_data_querier(1);
BOOST_REQUIRE_EQUAL(t.get_stats().time_based_evictions, 0);
// Don't waste time retrying before the TTL is up
sleep(1s).get();
eventually_true([&t] {
auto stats = t.get_stats();
return stats.time_based_evictions == 1;
});
t.assert_cache_lookup_data_querier(entry1.key, *t.get_schema(), entry1.expected_range, entry1.expected_slice)
.misses()
.no_drops()
.time_based_evictions();
// There should be no inactive reads, the querier_cache should unregister
// the expired queriers.
BOOST_REQUIRE_EQUAL(t.get_semaphore().get_stats().inactive_reads, 0);
}
sstring make_string_blob(size_t size) {
const char* const letters = "abcdefghijklmnoqprsuvwxyz";
auto& re = seastar::testing::local_random_engine;
std::uniform_int_distribution<size_t> dist(0, 25);
sstring s;
s.resize(size);
for (size_t i = 0; i < size; ++i) {
s[i] = letters[dist(re)];
}
return s;
}
SEASTAR_THREAD_TEST_CASE(test_memory_based_cache_eviction) {
auto cache_size = 1 << 20;
test_querier_cache t([] (size_t) {
const size_t blob_size = 1 << 10; // 1K
return make_string_blob(blob_size);
}, 24h, cache_size);
size_t i = 0;
auto entry = t.produce_first_page_and_save_data_querier(i++);
auto& sem = entry.permit.semaphore();
const auto entry_size = entry.permit.consumed_resources().memory;
// Fill the cache but don't overflow.
while (sem.available_resources().memory > entry_size) {
t.produce_first_page_and_save_data_querier(i++);
}
const auto pop_before = t.get_semaphore().get_stats().inactive_reads;
// Should overflow the limit and thus be evicted instantly.
entry = t.produce_first_page_and_save_data_querier(i++);
t.assert_cache_lookup_data_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice)
.resource_based_evictions()
.misses()
.no_drops();
// Since the last insert should have evicted an existing entry, we should
// have the same number of registered inactive reads.
BOOST_REQUIRE_EQUAL(t.get_semaphore().get_stats().inactive_reads, pop_before);
}
SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) {
auto db_cfg_ptr = make_shared<db::config>();
auto& db_cfg = *db_cfg_ptr;
db_cfg.enable_cache(false);
db_cfg.enable_commitlog(false);
do_with_cql_env_thread([] (cql_test_env& env) {
using namespace std::chrono_literals;
auto& db = env.local_db();
db.set_querier_cache_entry_ttl(24h);
try {
db.find_keyspace("querier_cache");
env.execute_cql("drop keyspace querier_cache;").get();
} catch (const replica::no_such_keyspace&) {
// expected
}
env.execute_cql("CREATE KEYSPACE querier_cache WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1};").get();
env.execute_cql("CREATE TABLE querier_cache.test (pk int, ck int, value int, primary key (pk, ck));").get();
BOOST_REQUIRE(env.local_db().has_schema("querier_cache", "test"));
auto insert_id = env.prepare("INSERT INTO querier_cache.test (pk, ck, value) VALUES (?, ?, ?);").get();
auto pk = cql3::raw_value::make_value(serialized(0));
for (int i = 0; i < 100; ++i) {
auto ck = cql3::raw_value::make_value(serialized(i));
env.execute_prepared(insert_id, {{pk, ck, ck}}).get();
}
BOOST_REQUIRE(env.local_db().has_schema("querier_cache", "test"));
auto& cf = db.find_column_family("querier_cache", "test");
auto s = cf.schema();
cf.flush().get();
auto slice = s->full_slice();
slice.options.set<query::partition_slice::option::allow_short_read>();
auto cmd1 = query::read_command(s->id(),
s->version(),
slice,
query::max_result_size(1024 * 1024),
query::tombstone_limit::max,
query::row_limit(1),
query::partition_limit(1),
gc_clock::now(),
std::nullopt,
query_id::create_random_id(),
query::is_first_page::yes);
// Should save the querier in cache.
db.query_mutations(s,
cmd1,
query::full_partition_range,
nullptr,
db::no_timeout).get();
auto& semaphore = db.get_reader_concurrency_semaphore();
BOOST_CHECK_EQUAL(db.get_querier_cache_stats().resource_based_evictions, 0);
// Drain all resources of the semaphore
auto sponge_permit = semaphore.make_tracking_only_permit(s, "sponge", db::no_timeout, {});
auto consumed_resources = sponge_permit.consume_resources(semaphore.available_resources());
auto cmd2 = query::read_command(s->id(),
s->version(),
slice,
query::max_result_size(1024 * 1024),
query::tombstone_limit::max,
query::row_limit(1),
query::partition_limit(1),
gc_clock::now(),
std::nullopt,
query_id::create_random_id(),
query::is_first_page::no);
// Should evict the already cached querier.
db.query_mutations(s,
cmd2,
query::full_partition_range,
nullptr,
db::no_timeout).get();
// The second read might be evicted too if it consumes more
// memory than the first and hence triggers memory control when
// saved in the querier cache.
BOOST_CHECK_GE(db.get_querier_cache_stats().resource_based_evictions, 1);
// We want to read the entire partition so that the querier
// is not saved at the end and thus ensure it is destroyed.
// We cannot leave scope with the querier still in the cache
// as that sadly leads to use-after-free as the database's
// resource_concurrency_semaphore will be destroyed before some
// of the tracked buffers.
cmd2.set_row_limit(query::max_rows);
cmd2.partition_limit = query::max_partitions;
cmd2.max_result_size.emplace(query::result_memory_limiter::unlimited_result_size);
db.query_mutations(s,
cmd2,
query::full_partition_range,
nullptr,
db::no_timeout).get();
return make_ready_future<>();
}, std::move(db_cfg_ptr)).get();
}
SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) {
test_querier_cache t;
auto& sem = t.get_semaphore();
auto permit1 = sem.obtain_permit(t.get_schema(), get_name(), 0, db::no_timeout, {}).get();
auto resources = permit1.consume_resources(reader_resources(sem.available_resources().count, 0));
BOOST_CHECK_EQUAL(sem.available_resources().count, 0);
auto fut = sem.obtain_permit(t.get_schema(), get_name(), 1, db::no_timeout, {});
BOOST_CHECK_EQUAL(sem.get_stats().waiters, 1);
const auto entry = t.produce_first_page_and_save_mutation_querier();
t.assert_cache_lookup_mutation_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice)
.misses()
.no_drops()
.resource_based_evictions();
resources.reset_to_zero();
fut.get();
}
SEASTAR_THREAD_TEST_CASE(test_unique_inactive_read_handle) {
reader_concurrency_semaphore sem1(reader_concurrency_semaphore::no_limits{}, "sem1", reader_concurrency_semaphore::register_metrics::no);
auto stop_sem1 = deferred_stop(sem1);
reader_concurrency_semaphore sem2(reader_concurrency_semaphore::no_limits{}, "", reader_concurrency_semaphore::register_metrics::no); // to see the message for an unnamed semaphore
auto stop_sem2 = deferred_stop(sem2);
auto schema = schema_builder("ks", "cf")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("v", int32_type)
.build();
auto sem1_h1 = sem1.register_inactive_read(make_empty_mutation_reader(schema, sem1.make_tracking_only_permit(schema, get_name(), db::no_timeout, {})));
auto sem2_h1 = sem2.register_inactive_read(make_empty_mutation_reader(schema, sem2.make_tracking_only_permit(schema, get_name(), db::no_timeout, {})));
// Sanity check that lookup still works with empty handle.
BOOST_REQUIRE(!sem1.unregister_inactive_read(reader_concurrency_semaphore::inactive_read_handle{}));
set_abort_on_internal_error(false);
auto reset_on_internal_abort = defer([] {
set_abort_on_internal_error(true);
});
BOOST_REQUIRE_THROW(sem1.unregister_inactive_read(std::move(sem2_h1)), std::runtime_error);
BOOST_REQUIRE_THROW(sem2.unregister_inactive_read(std::move(sem1_h1)), std::runtime_error);
}
SEASTAR_THREAD_TEST_CASE(test_semaphore_mismatch) {
reader_concurrency_semaphore other_sem(reader_concurrency_semaphore::no_limits{}, "other_semaphore", reader_concurrency_semaphore::register_metrics::no);
auto stop_sem1 = deferred_stop(other_sem);
bool is_user_semaphore = true;
auto is_user_semaphore_func = [&] (const reader_concurrency_semaphore& sem) {
if (&sem == &other_sem) {
return is_user_semaphore;
}
return true;
};
test_querier_cache t(is_user_semaphore_func);
auto& sem = t.get_semaphore();
// Same semaphore
{
const auto entry = t.produce_first_page_and_save_data_querier();
t.assert_cache_lookup_data_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice, sem)
.no_misses()
.no_drops()
.no_evictions();
}
// Other semaphore, other is a "user" semaphore
{
const auto entry = t.produce_first_page_and_save_data_querier();
t.assert_cache_lookup_data_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice, other_sem)
.no_misses()
.drops()
.no_evictions();
}
// Other semaphore, other is not a "user" semaphore
{
bool abort = set_abort_on_internal_error(false);
auto reset_abort = defer([abort] {
set_abort_on_internal_error(abort);
});
is_user_semaphore = false;
const auto entry = t.produce_first_page_and_save_data_querier();
BOOST_REQUIRE_EXCEPTION(t.assert_cache_lookup_data_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice, other_sem),
std::runtime_error,
exception_predicate::message_contains("semaphore mismatch detected, dropping reader"));
t.no_misses()
.drops()
.no_evictions();
}
}
#if SEASTAR_DEBUG
static const std::chrono::seconds ttl_timeout_test_timeout = 4s;
#else
static const std::chrono::seconds ttl_timeout_test_timeout = 1s;
#endif
SEASTAR_THREAD_TEST_CASE(test_timeout_not_sticky_on_insert) {
test_querier_cache t;
const auto entry = t.produce_first_page_and_save_mutation_querier(db::timeout_clock::now() + ttl_timeout_test_timeout);
sleep(ttl_timeout_test_timeout * 2).get();
t.assert_cache_lookup_mutation_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice)
.no_misses()
.no_drops()
.no_evictions();
}
SEASTAR_THREAD_TEST_CASE(test_ttl_not_sticky_on_lookup) {
test_querier_cache t(ttl_timeout_test_timeout);
auto& sem = t.get_semaphore();
auto permit1 = sem.obtain_permit(t.get_schema(), get_name(), 1024, db::no_timeout, {}).get();
const auto entry = t.produce_first_page_and_save_mutation_querier();
const auto new_timeout = db::timeout_clock::now() + 900s;
t.assert_cache_lookup_mutation_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice, new_timeout)
.no_misses()
.no_drops()
.no_evictions();
BOOST_REQUIRE(entry.permit.timeout() == new_timeout);
sleep(ttl_timeout_test_timeout * 2).get();
// get_abort_exception() will contain the timeout exception if the permit timed out due to sticky TTL during the above sleep.
BOOST_REQUIRE(!entry.permit.get_abort_exception());
}
SEASTAR_THREAD_TEST_CASE(test_timeout_is_applied_on_lookup) {
test_querier_cache t;
auto& sem = t.get_semaphore();
auto permit1 = sem.obtain_permit(t.get_schema(), get_name(), 1024, db::no_timeout, {}).get();
const auto entry = t.produce_first_page_and_save_mutation_querier();
const auto new_timeout = db::timeout_clock::now() + ttl_timeout_test_timeout;
t.assert_cache_lookup_mutation_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice, new_timeout)
.no_misses()
.no_drops()
.no_evictions();
BOOST_REQUIRE(entry.permit.timeout() == new_timeout);
BOOST_REQUIRE(!entry.permit.get_abort_exception());
sleep(ttl_timeout_test_timeout * 2).get();
BOOST_REQUIRE(entry.permit.get_abort_exception());
BOOST_REQUIRE_THROW(std::rethrow_exception(entry.permit.get_abort_exception()), seastar::named_semaphore_timed_out);
}
BOOST_AUTO_TEST_SUITE_END()