Files
scylladb/test/boost/multishard_query_test.cc
Pavel Emelyanov 54edb44b20 code: Stop using seastar::compat::source_location
And switch to std::source_location.
Upcoming seastar update will deprecate its compatibility layer.

The patch is

  for f in $(git grep -l 'seastar::compat::source_location'); do
    sed -e 's/seastar::compat::source_location/std::source_location/g' -i $f;
  done

and removal of few header includes.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#27309
2025-11-27 19:10:11 +02:00

1197 lines
49 KiB
C++

/*
* Copyright (C) 2018-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "utils/assert.hh"
#include "replica/multishard_query.hh"
#include "schema/schema_registry.hh"
#include "db/config.hh"
#include "db/extensions.hh"
#include "partition_slice_builder.hh"
#include "serializer_impl.hh"
#include "query/query-result-set.hh"
#include "mutation_query.hh"
#include "test/lib/cql_test_env.hh"
#include "test/lib/eventually.hh"
#include "test/lib/mutation_assertions.hh"
#include "test/lib/log.hh"
#include "test/lib/test_utils.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/random_schema.hh"
#include "tombstone_gc_extension.hh"
#include "db/tags/extension.hh"
#include "cdc/cdc_extension.hh"
#include "db/paxos_grace_seconds_extension.hh"
#include "db/per_partition_rate_limit_extension.hh"
#undef SEASTAR_TESTING_MAIN
#include <seastar/testing/test_case.hh>
#include <seastar/testing/thread_test_case.hh>
#include <fmt/ranges.h>
#include <utility>
#include <algorithm>
namespace {
sstring create_vnodes_keyspace(cql_test_env& env) {
env.execute_cql("CREATE KEYSPACE ks_vnodes"
" WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}"
" AND tablets = {'enabled': 'false'};").get();
return "ks_vnodes";
}
static cql_test_config cql_config_with_extensions() {
auto ext = std::make_shared<db::extensions>();
ext->add_schema_extension<db::tags_extension>(db::tags_extension::NAME);
ext->add_schema_extension<cdc::cdc_extension>(cdc::cdc_extension::NAME);
ext->add_schema_extension<db::paxos_grace_seconds_extension>(db::paxos_grace_seconds_extension::NAME);
ext->add_schema_extension<tombstone_gc_extension>(tombstone_gc_extension::NAME);
ext->add_schema_extension<db::per_partition_rate_limit_extension>(db::per_partition_rate_limit_extension::NAME);
auto cfg = seastar::make_shared<db::config>(ext);
return cql_test_config(cfg);
}
struct generated_table {
schema_ptr schema;
std::vector<dht::decorated_key> keys;
utils::chunked_vector<frozen_mutation> compacted_frozen_mutations;
};
class random_schema_specification : public tests::random_schema_specification {
sstring _table_name;
std::unique_ptr<tests::random_schema_specification> _underlying_spec;
public:
random_schema_specification(sstring ks_name, sstring table_name, bool force_clustering_column)
: tests::random_schema_specification(std::move(ks_name))
, _table_name(std::move(table_name))
, _underlying_spec(tests::make_random_schema_specification(
keyspace_name(),
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(size_t(force_clustering_column), 4),
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(0, 4)))
{ }
virtual sstring table_name(std::mt19937& engine) override { return _table_name; }
virtual sstring udt_name(std::mt19937& engine) override { return _underlying_spec->udt_name(engine); }
virtual std::vector<data_type> partition_key_columns(std::mt19937& engine) override { return _underlying_spec->partition_key_columns(engine); }
virtual std::vector<data_type> clustering_key_columns(std::mt19937& engine) override { return _underlying_spec->clustering_key_columns(engine); }
virtual std::vector<data_type> regular_columns(std::mt19937& engine) override { return _underlying_spec->regular_columns(engine); }
virtual std::vector<data_type> static_columns(std::mt19937& engine) override { return _underlying_spec->static_columns(engine); }
virtual compress_sstable& compress() override { return _underlying_spec->compress(); };
};
} // anonymous namespace
static generated_table create_test_table(
cql_test_env& env,
uint32_t seed,
sstring ks_name,
sstring tbl_name,
bool force_clustering_column,
std::uniform_int_distribution<size_t> partitions,
std::uniform_int_distribution<size_t> clustering_rows,
std::uniform_int_distribution<size_t> range_tombstones,
tests::timestamp_generator ts_gen) {
auto random_schema_spec = std::make_unique<random_schema_specification>(ks_name, tbl_name, force_clustering_column);
auto random_schema = tests::random_schema(seed, *random_schema_spec);
testlog.info("\n{}", random_schema.cql());
random_schema.create_with_cql(env).get();
const auto mutations = tests::generate_random_mutations(
seed,
random_schema,
ts_gen,
tests::no_expiry_expiry_generator(),
partitions,
clustering_rows,
range_tombstones).get();
auto schema = random_schema.schema();
std::vector<dht::decorated_key> keys;
utils::chunked_vector<frozen_mutation> compacted_frozen_mutations;
keys.reserve(mutations.size());
compacted_frozen_mutations.reserve(mutations.size());
{
gate write_gate;
for (const auto& mut : mutations) {
keys.emplace_back(mut.decorated_key());
compacted_frozen_mutations.emplace_back(freeze(mut.compacted()));
(void)with_gate(write_gate, [&] {
return smp::submit_to(dht::static_shard_of(*schema, mut.decorated_key().token()), [&env, gs = global_schema_ptr(schema), mut = freeze(mut)] () mutable {
return env.local_db().apply(gs.get(), std::move(mut), {}, db::commitlog_force_sync::no, db::no_timeout);
});
});
thread::maybe_yield();
}
write_gate.close().get();
}
return {random_schema.schema(), keys, compacted_frozen_mutations};
}
api::timestamp_type no_tombstone_timestamp_generator(std::mt19937& engine, tests::timestamp_destination destination, api::timestamp_type min_timestamp) {
switch (destination) {
case tests::timestamp_destination::partition_tombstone:
case tests::timestamp_destination::row_tombstone:
case tests::timestamp_destination::collection_tombstone:
case tests::timestamp_destination::range_tombstone:
return api::missing_timestamp;
default:
return std::uniform_int_distribution<api::timestamp_type>(min_timestamp, api::max_timestamp)(engine);
}
}
static std::pair<schema_ptr, std::vector<dht::decorated_key>> create_test_table(cql_test_env& env, sstring ks_name,
sstring tbl_name, int partition_count = 10 * smp::count, int row_per_partition_count = 10) {
auto res = create_test_table(
env,
tests::random::get_int<uint32_t>(),
std::move(ks_name),
std::move(tbl_name),
true,
std::uniform_int_distribution<size_t>(partition_count, partition_count),
std::uniform_int_distribution<size_t>(row_per_partition_count, row_per_partition_count),
std::uniform_int_distribution<size_t>(0, 0),
no_tombstone_timestamp_generator);
return {std::move(res.schema), std::move(res.keys)};
}
static uint64_t aggregate_querier_cache_stat(sharded<replica::database>& db, uint64_t replica::querier_cache::stats::*stat) {
return map_reduce(std::views::iota(0u, smp::count), [stat, &db] (unsigned shard) {
return db.invoke_on(shard, [stat] (replica::database& local_db) {
auto& stats = local_db.get_querier_cache_stats();
return stats.*stat;
});
}, 0, std::plus<size_t>()).get();
}
static void check_cache_population(sharded<replica::database>& db, size_t queriers,
std::source_location sl = std::source_location::current()) {
testlog.info("{}() called from {}() {}:{:d}", __FUNCTION__, sl.function_name(), sl.file_name(), sl.line());
parallel_for_each(std::views::iota(0u, smp::count), [queriers, &db] (unsigned shard) {
return db.invoke_on(shard, [queriers] (replica::database& local_db) {
auto& stats = local_db.get_querier_cache_stats();
tests::require_equal(stats.population, queriers);
});
}).get();
}
static void require_eventually_empty_caches(sharded<replica::database>& db,
std::source_location sl = std::source_location::current()) {
testlog.info("{}() called from {}() {}:{:d}", __FUNCTION__, sl.function_name(), sl.file_name(), sl.line());
auto aggregated_population_is_zero = [&] () mutable {
return aggregate_querier_cache_stat(db, &replica::querier_cache::stats::population) == 0;
};
tests::require(eventually_true(aggregated_population_is_zero));
}
BOOST_AUTO_TEST_SUITE(multishard_query_test)
// Best run with SMP>=2
SEASTAR_THREAD_TEST_CASE(test_abandoned_read) {
do_with_cql_env_thread([] (cql_test_env& env) -> future<> {
using namespace std::chrono_literals;
env.db().invoke_on_all([] (replica::database& db) {
db.set_querier_cache_entry_ttl(1s);
}).get();
const auto ks = create_vnodes_keyspace(env);
auto [s, _] = create_test_table(env, ks, get_name());
(void)_;
auto cmd = query::read_command(
s->id(),
s->version(),
s->full_slice(),
query::max_result_size(query::result_memory_limiter::unlimited_result_size),
query::tombstone_limit::max,
query::row_limit(7),
query::partition_limit::max,
gc_clock::now(),
std::nullopt,
query_id::create_random_id(),
query::is_first_page::yes);
query_mutations_on_all_shards(env.db(), s, cmd, {query::full_partition_range}, nullptr, db::no_timeout).get();
require_eventually_empty_caches(env.db());
return make_ready_future<>();
}, cql_config_with_extensions()).get();
}
} // multishard_query_test namespace
static utils::chunked_vector<mutation> read_all_partitions_one_by_one(sharded<replica::database>& db, schema_ptr s, std::vector<dht::decorated_key> pkeys,
const query::partition_slice& slice) {
const auto& sharder = s->get_sharder();
utils::chunked_vector<mutation> results;
results.reserve(pkeys.size());
for (const auto& pkey : pkeys) {
const auto res = db.invoke_on(sharder.shard_for_reads(pkey.token()), [gs = global_schema_ptr(s), &pkey, &slice] (replica::database& db) {
return async([s = gs.get(), &pkey, &slice, &db] () mutable {
const auto cmd = query::read_command(s->id(), s->version(), slice,
query::max_result_size(query::result_memory_limiter::unlimited_result_size), query::tombstone_limit::max);
const auto range = dht::partition_range::make_singular(pkey);
return make_foreign(std::make_unique<reconcilable_result>(
std::get<0>(db.query_mutations(std::move(s), cmd, range, nullptr, db::no_timeout).get())));
});
}).get();
tests::require_equal(res->partitions().size(), 1u);
results.emplace_back(res->partitions().front().mut().unfreeze(s));
}
return results;
}
static utils::chunked_vector<mutation> read_all_partitions_one_by_one(sharded<replica::database>& db, schema_ptr s, std::vector<dht::decorated_key> pkeys) {
return read_all_partitions_one_by_one(db, s, pkeys, s->full_slice());
}
using stateful_query = bool_class<class stateful>;
template <typename ResultBuilder>
static std::pair<typename ResultBuilder::end_result_type, size_t>
read_partitions_with_generic_paged_scan(sharded<replica::database>& db, schema_ptr s, uint32_t page_size, uint64_t max_size, stateful_query is_stateful,
const dht::partition_range_vector& original_ranges, const query::partition_slice& slice, const std::function<void(size_t)>& page_hook = {}) {
const auto query_uuid = is_stateful ? query_id::create_random_id() : query_id::create_null_id();
ResultBuilder res_builder(s, slice, page_size);
auto cmd = query::read_command(
s->id(),
s->version(),
slice,
query::max_result_size(max_size),
query::tombstone_limit::max,
query::row_limit(page_size),
query::partition_limit::max,
gc_clock::now(),
std::nullopt,
query_uuid,
query::is_first_page::yes);
bool has_more = true;
auto ranges = std::make_unique<dht::partition_range_vector>(original_ranges);
// First page is special, needs to have `is_first_page` set.
{
auto res = ResultBuilder::query(db, s, cmd, *ranges, nullptr, db::no_timeout);
has_more = res_builder.add(*res);
cmd.is_first_page = query::is_first_page::no;
if (page_hook && has_more) {
page_hook(0);
}
}
if (!has_more) {
return std::pair(std::move(res_builder).get_end_result(), 1);
}
const auto cmp = dht::ring_position_comparator(*s);
unsigned npages = 1;
// Rest of the pages. Loop until an empty page turns up. Not very
// sophisticated but simple and safe.
while (has_more) {
// Force freeing the vector to avoid hiding any bugs related to storing
// references to the ranges vector (which is not alive between pages in
// real life).
ranges = std::make_unique<dht::partition_range_vector>(original_ranges);
while (!ranges->front().contains(res_builder.last_pkey(), cmp)) {
ranges->erase(ranges->begin());
}
SCYLLA_ASSERT(!ranges->empty());
const auto pkrange_begin_inclusive = res_builder.last_ckey() && res_builder.last_pkey_rows() < slice.partition_row_limit();
auto range_opt = ranges->front().trim_front(dht::partition_range::bound(res_builder.last_pkey(), pkrange_begin_inclusive), dht::ring_position_comparator(*s));
if (range_opt) {
ranges->front() = std::move(*range_opt);
} else {
ranges->erase(ranges->begin());
if (ranges->empty()) {
break;
}
}
if (res_builder.last_ckey()) {
auto ckranges = cmd.slice.default_row_ranges();
query::trim_clustering_row_ranges_to(*s, ckranges, *res_builder.last_ckey());
cmd.slice.clear_range(*s, res_builder.last_pkey().key());
cmd.slice.clear_ranges();
cmd.slice.set_range(*s, res_builder.last_pkey().key(), std::move(ckranges));
}
auto res = ResultBuilder::query(db, s, cmd, *ranges, nullptr, db::no_timeout);
if (is_stateful) {
tests::require(aggregate_querier_cache_stat(db, &replica::querier_cache::stats::lookups) >= npages);
}
has_more = res_builder.add(*res);
if (has_more) {
if (page_hook) {
page_hook(npages);
}
npages++;
}
}
return std::pair(std::move(res_builder).get_end_result(), npages);
}
template <typename ResultBuilder>
static std::pair<typename ResultBuilder::end_result_type, size_t>
read_partitions_with_generic_paged_scan(sharded<replica::database>& db, schema_ptr s, uint32_t page_size, uint64_t max_size, stateful_query is_stateful,
const dht::partition_range& range, const query::partition_slice& slice, const std::function<void(size_t)>& page_hook = {}) {
dht::partition_range_vector ranges{range};
return read_partitions_with_generic_paged_scan<ResultBuilder>(db, std::move(s), page_size, max_size, is_stateful, ranges, slice, page_hook);
}
class mutation_result_builder {
public:
using end_result_type = utils::chunked_vector<mutation>;
private:
schema_ptr _s;
uint64_t _page_size = 0;
utils::chunked_vector<mutation> _results;
std::optional<dht::decorated_key> _last_pkey;
std::optional<clustering_key> _last_ckey;
uint64_t _last_pkey_rows = 0;
private:
std::optional<clustering_key> last_ckey_of(const mutation& mut) const {
if (mut.partition().clustered_rows().empty()) {
return std::nullopt;
}
return mut.partition().clustered_rows().rbegin()->key();
}
public:
static foreign_ptr<lw_shared_ptr<reconcilable_result>> query(
sharded<replica::database>& db,
schema_ptr s,
const query::read_command& cmd,
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout) {
return std::get<0>(query_mutations_on_all_shards(db, std::move(s), cmd, ranges, std::move(trace_state), timeout).get());
}
explicit mutation_result_builder(schema_ptr s, const query::partition_slice&, uint64_t page_size) : _s(std::move(s)), _page_size(page_size) { }
bool add(const reconcilable_result& res) {
auto it = res.partitions().begin();
auto end = res.partitions().end();
if (it == end) {
return false;
}
auto first_mut = it->mut().unfreeze(_s);
_last_pkey = first_mut.decorated_key();
_last_ckey = last_ckey_of(first_mut);
// The first partition of the new page may overlap with the last
// partition of the last page.
if (!_results.empty() && _results.back().decorated_key().equal(*_s, first_mut.decorated_key())) {
_last_pkey_rows += it->row_count();
_results.back().apply(std::move(first_mut));
} else {
_last_pkey_rows = it->row_count();
_results.emplace_back(std::move(first_mut));
}
++it;
for (;it != end; ++it) {
auto mut = it->mut().unfreeze(_s);
_last_pkey = mut.decorated_key();
_last_pkey_rows = it->row_count();
_last_ckey = last_ckey_of(mut);
_results.emplace_back(std::move(mut));
}
return res.is_short_read() || res.row_count() >= _page_size;
}
const dht::decorated_key& last_pkey() const { return _last_pkey.value(); }
const clustering_key* last_ckey() const { return _last_ckey ? &*_last_ckey : nullptr; }
uint64_t last_pkey_rows() const { return _last_pkey_rows; }
end_result_type get_end_result() && {
return std::move(_results);
}
};
class data_result_builder {
public:
using end_result_type = query::result_set;
private:
schema_ptr _s;
const query::partition_slice& _slice;
uint64_t _page_size = 0;
std::vector<query::result_set_row> _rows;
std::optional<dht::decorated_key> _last_pkey;
std::optional<clustering_key> _last_ckey;
uint64_t _last_pkey_rows = 0;
template <typename Key>
Key extract_key(const query::result_set_row& row, const schema::const_iterator_range_type& key_cols) const {
std::vector<bytes> exploded;
for (const auto& cdef : key_cols) {
exploded.push_back(row.get_data_value(cdef.name_as_text())->serialize_nonnull());
}
return Key::from_exploded(*_s, exploded);
}
dht::decorated_key extract_pkey(const query::result_set_row& row) const {
return dht::decorate_key(*_s, extract_key<partition_key>(row, _s->partition_key_columns()));
}
clustering_key extract_ckey(const query::result_set_row& row) const {
return extract_key<clustering_key>(row, _s->clustering_key_columns());
}
public:
static foreign_ptr<lw_shared_ptr<query::result>> query(
sharded<replica::database>& db,
schema_ptr s,
const query::read_command& cmd,
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout) {
return std::get<0>(query_data_on_all_shards(db, std::move(s), cmd, ranges, query::result_options::only_result(), std::move(trace_state), timeout).get());
}
explicit data_result_builder(schema_ptr s, const query::partition_slice& slice, uint64_t page_size) : _s(std::move(s)), _slice(slice), _page_size(page_size) { }
bool add(const query::result& raw_res) {
auto res = query::result_set::from_raw_result(_s, _slice, raw_res);
if (res.rows().empty()) {
return false;
}
for (const auto& row : res.rows()) {
_rows.emplace_back(row.copy());
_last_ckey = extract_ckey(row);
auto last_pkey = extract_pkey(row);
if (_last_pkey && last_pkey.equal(*_s, *_last_pkey)) {
++_last_pkey_rows;
} else {
_last_pkey = std::move(last_pkey);
_last_pkey_rows = 1;
}
}
return raw_res.is_short_read() || res.rows().size() >= _page_size;
}
const dht::decorated_key& last_pkey() const { return _last_pkey.value(); }
const clustering_key* last_ckey() const { return _last_ckey ? &*_last_ckey : nullptr; }
uint64_t last_pkey_rows() const { return _last_pkey_rows; }
end_result_type get_end_result() && {
return query::result_set(_s, std::move(_rows));
}
};
static std::pair<utils::chunked_vector<mutation>, size_t>
read_partitions_with_paged_scan(sharded<replica::database>& db, schema_ptr s, uint32_t page_size, uint64_t max_size, stateful_query is_stateful,
const dht::partition_range& range, const query::partition_slice& slice, const std::function<void(size_t)>& page_hook = {}) {
return read_partitions_with_generic_paged_scan<mutation_result_builder>(db, std::move(s), page_size, max_size, is_stateful, range, slice, page_hook);
}
static std::pair<utils::chunked_vector<mutation>, size_t>
read_all_partitions_with_paged_scan(sharded<replica::database>& db, schema_ptr s, uint32_t page_size, stateful_query is_stateful,
const std::function<void(size_t)>& page_hook) {
return read_partitions_with_paged_scan(db, s, page_size, std::numeric_limits<uint64_t>::max(), is_stateful, query::full_partition_range,
s->full_slice(), page_hook);
}
void check_results_are_equal(utils::chunked_vector<mutation>& results1, utils::chunked_vector<mutation>& results2) {
tests::require_equal(results1.size(), results2.size());
auto mut_less = [] (const mutation& a, const mutation& b) {
return a.decorated_key().less_compare(*a.schema(), b.decorated_key());
};
std::ranges::sort(results1, mut_less);
std::ranges::sort(results2, mut_less);
for (unsigned i = 0; i < results1.size(); ++i) {
testlog.trace("Comparing mutation #{:d}", i);
assert_that(results2[i]).is_equal_to(results1[i]);
}
}
namespace multishard_query_test {
// Best run with SMP>=2
SEASTAR_THREAD_TEST_CASE(test_read_all) {
do_with_cql_env_thread([] (cql_test_env& env) -> future<> {
using namespace std::chrono_literals;
env.db().invoke_on_all([] (replica::database& db) {
db.set_querier_cache_entry_ttl(2s);
}).get();
const auto ks = create_vnodes_keyspace(env);
auto [s, pkeys] = create_test_table(env, ks, get_name());
// First read all partition-by-partition (not paged).
auto results1 = read_all_partitions_one_by_one(env.db(), s, pkeys);
uint64_t lookups = 0;
uint64_t misses = 0;
auto saved_readers = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::population);
// Then do a paged range-query, with reader caching
auto results2 = read_all_partitions_with_paged_scan(env.db(), s, 4, stateful_query::yes, [&] (size_t page) {
const auto new_lookups = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::lookups);
const auto new_misses = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::misses);
if (page) {
tests::require(new_lookups > lookups);
}
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::drops), 0u);
tests::require_less_equal(new_misses - misses, smp::count - saved_readers);
lookups = new_lookups;
misses = new_misses;
saved_readers = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::population);
tests::require_greater_equal(saved_readers, 1u);
}).first;
check_results_are_equal(results1, results2);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::drops), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::time_based_evictions), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::resource_based_evictions), 0u);
require_eventually_empty_caches(env.db());
// Then do a paged range-query, without reader caching
auto results3 = read_all_partitions_with_paged_scan(env.db(), s, 4, stateful_query::no, [&] (size_t) {
check_cache_population(env.db(), 0);
}).first;
check_results_are_equal(results1, results3);
return make_ready_future<>();
}, cql_config_with_extensions()).get();
}
// Best run with SMP>=2
SEASTAR_THREAD_TEST_CASE(test_read_all_multi_range) {
do_with_cql_env_thread([] (cql_test_env& env) -> future<> {
using namespace std::chrono_literals;
env.db().invoke_on_all([] (replica::database& db) {
db.set_querier_cache_entry_ttl(2s);
}).get();
const auto ks = create_vnodes_keyspace(env);
auto [s, pkeys] = create_test_table(env, ks, get_name());
const auto limit = std::numeric_limits<uint64_t>::max();
const auto slice = s->full_slice();
testlog.info("pkeys.size()={}", pkeys.size());
for (const auto step : {1ul, pkeys.size() / 4u, pkeys.size() / 2u}) {
if (!step) {
continue;
}
dht::partition_range_vector ranges;
ranges.push_back(dht::partition_range::make_ending_with({*pkeys.begin(), false}));
const auto max_r = pkeys.size() - 1;
for (unsigned r = 0; r < max_r; r += step) {
ranges.push_back(dht::partition_range::make({pkeys.at(r), true}, {pkeys.at(std::min(max_r, r + step)), false}));
}
ranges.push_back(dht::partition_range::make_starting_with({*pkeys.rbegin(), true}));
unsigned i = 0;
testlog.debug("Scan with step={}, ranges={}", step, ranges);
// Keep indent the same to reduce white-space noise
for (const auto page_size : {1, 4, 8, 19, 100}) {
for (const auto stateful : {stateful_query::no, stateful_query::yes}) {
testlog.debug("[scan #{}]: page_size={}, stateful={}", i++, page_size, stateful);
// First read all partition-by-partition (not paged).
auto expected_results = read_all_partitions_one_by_one(env.db(), s, pkeys);
auto results = read_partitions_with_generic_paged_scan<mutation_result_builder>(env.db(), s, page_size, limit, stateful, ranges, slice, [&] (size_t) {
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::drops), 0u);
}).first;
check_results_are_equal(expected_results, results);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::drops), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::time_based_evictions), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::resource_based_evictions), 0u);
}}
}
require_eventually_empty_caches(env.db());
return make_ready_future<>();
}, cql_config_with_extensions()).get();
}
// Best run with SMP>=2
SEASTAR_THREAD_TEST_CASE(test_read_with_partition_row_limits) {
do_with_cql_env_thread([] (cql_test_env& env) -> future<> {
using namespace std::chrono_literals;
env.db().invoke_on_all([] (replica::database& db) {
db.set_querier_cache_entry_ttl(2s);
}).get();
const auto ks = create_vnodes_keyspace(env);
auto [s, pkeys] = create_test_table(env, ks, get_name(), 2, 10);
unsigned i = 0;
// Keep indent the same to reduce white-space noise
for (const auto partition_row_limit : {1ul, 4ul, 8ul, query::partition_max_rows}) {
for (const auto page_size : {1, 4, 8, 19}) {
for (const auto stateful : {stateful_query::no, stateful_query::yes}) {
testlog.debug("[scan #{}]: partition_row_limit={}, page_size={}, stateful={}", i++, partition_row_limit, page_size, stateful);
const auto slice = partition_slice_builder(*s, s->full_slice())
.reversed()
.with_partition_row_limit(partition_row_limit)
.build();
// First read all partition-by-partition (not paged).
auto results1 = read_all_partitions_one_by_one(env.db(), s, pkeys);
auto misses = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::misses);
auto lookups = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::lookups);
auto saved_readers = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::population);
// Then do a paged range-query
auto results2 = read_all_partitions_with_paged_scan(env.db(), s, page_size, stateful, [&] (size_t page) {
if (!stateful) {
return;
}
const auto new_misses = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::misses);
const auto new_lookups = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::lookups);
if (page) {
tests::require(new_lookups > lookups);
}
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::drops), 0u);
tests::require_less_equal(new_misses - misses, smp::count - saved_readers);
lookups = new_lookups;
misses = new_misses;
saved_readers = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::population);
tests::require_greater_equal(saved_readers, 1u);
}).first;
check_results_are_equal(results1, results2);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::drops), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::time_based_evictions), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::resource_based_evictions), 0u);
} } }
return make_ready_future<>();
}, cql_config_with_extensions()).get();
}
// Best run with SMP>=2
SEASTAR_THREAD_TEST_CASE(test_evict_a_shard_reader_on_each_page) {
do_with_cql_env_thread([] (cql_test_env& env) -> future<> {
using namespace std::chrono_literals;
env.db().invoke_on_all([] (replica::database& db) {
db.set_querier_cache_entry_ttl(2s);
}).get();
const auto ks = create_vnodes_keyspace(env);
auto [s, pkeys] = create_test_table(env, ks, get_name());
// First read all partition-by-partition (not paged).
auto results1 = read_all_partitions_one_by_one(env.db(), s, pkeys);
int64_t lookups = 0;
uint64_t evictions = 0;
// Then do a paged range-query
auto [results2, npages] = read_all_partitions_with_paged_scan(env.db(), s, 4, stateful_query::yes, [&] (size_t page) {
const auto new_lookups = aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::lookups);
if (page) {
tests::require(std::cmp_greater(new_lookups, lookups), std::source_location::current());
}
lookups = new_lookups;
for (unsigned shard = 0; shard < smp::count; ++shard) {
auto evicted = smp::submit_to(shard, [&] {
return env.local_db().get_querier_cache().evict_one();
}).get();
if (evicted) {
++evictions;
break;
}
}
tests::require(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::misses) >= page, std::source_location::current());
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::drops), 0u, std::source_location::current());
});
check_results_are_equal(results1, results2);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::drops), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::time_based_evictions), 0u);
tests::require_equal(aggregate_querier_cache_stat(env.db(), &replica::querier_cache::stats::resource_based_evictions), evictions);
require_eventually_empty_caches(env.db());
return make_ready_future<>();
}, cql_config_with_extensions()).get();
}
// Best run with SMP>=2
SEASTAR_THREAD_TEST_CASE(test_read_reversed) {
do_with_cql_env_thread([] (cql_test_env& env) -> future<> {
using namespace std::chrono_literals;
auto& db = env.db();
const auto ks = create_vnodes_keyspace(env);
auto [s, pkeys] = create_test_table(env, ks, get_name(), 4, 8);
s = s->make_reversed();
unsigned i = 0;
// Keep indent the same to reduce white-space noise
for (const auto partition_row_limit : {1ul, 4ul, 8ul, query::partition_max_rows}) {
for (const auto page_size : {1, 4, 8, 19}) {
for (const auto stateful : {stateful_query::no, stateful_query::yes}) {
testlog.debug("[scan #{}]: partition_row_limit={}, page_size={}, stateful={}", i++, partition_row_limit, page_size, stateful);
const auto slice = partition_slice_builder(*s, s->full_slice())
.reversed()
.with_partition_row_limit(partition_row_limit)
.build();
// First read all partition-by-partition (not paged).
auto expected_results = read_all_partitions_one_by_one(env.db(), s, pkeys, slice);
auto [mutation_results, _np1] = read_partitions_with_generic_paged_scan<mutation_result_builder>(db, s, page_size, std::numeric_limits<uint64_t>::max(), stateful,
query::full_partition_range, slice);
check_results_are_equal(expected_results, mutation_results);
auto [data_results, _np2] = read_partitions_with_generic_paged_scan<data_result_builder>(db, s, page_size, std::numeric_limits<uint64_t>::max(), stateful,
query::full_partition_range, slice);
std::vector<query::result_set_row> expected_rows;
for (const auto& mut : expected_results) {
auto rs = query::result_set(mut);
std::ranges::copy(rs.rows() | std::views::transform([](const auto& row) { return row.copy(); }), std::back_inserter(expected_rows));
}
auto expected_data_results = query::result_set(s, std::move(expected_rows));
BOOST_REQUIRE_EQUAL(data_results, expected_data_results);
tests::require_equal(aggregate_querier_cache_stat(db, &replica::querier_cache::stats::drops), 0u);
tests::require_equal(aggregate_querier_cache_stat(db, &replica::querier_cache::stats::time_based_evictions), 0u);
tests::require_equal(aggregate_querier_cache_stat(db, &replica::querier_cache::stats::resource_based_evictions), 0u);
} } }
require_eventually_empty_caches(env.db());
return make_ready_future<>();
}, cql_config_with_extensions()).get();
}
} // multishard_query_test namespace
namespace {
class buffer_ostream {
size_t _size_remaining;
bytes _buf;
bytes::value_type* _pos;
public:
explicit buffer_ostream(size_t size)
: _size_remaining(size)
, _buf(bytes::initialized_later{}, size)
, _pos(_buf.data()) {
}
void write(bytes_view v) {
if (!_size_remaining) {
return;
}
const auto write_size = std::min(_size_remaining, v.size());
std::copy_n(v.data(), write_size, _pos);
_pos += write_size;
_size_remaining -= write_size;
}
void write(const char* ptr, size_t size) {
write(bytes_view(reinterpret_cast<const bytes_view::value_type*>(ptr), size));
}
bytes detach() && {
return std::move(_buf);
}
};
template <typename T>
static size_t calculate_serialized_size(const T& v) {
struct {
size_t _size = 0;
void write(bytes_view v) { _size += v.size(); }
void write(const char*, size_t size) { _size += size; }
} os;
ser::serialize(os, v);
return os._size;
}
struct blob_header {
uint32_t size;
bool includes_pk;
bool has_ck;
bool includes_ck;
};
} // anonymous namespace
namespace ser {
template <>
struct serializer<blob_header> {
template <typename Input>
static blob_header read(Input& in) {
blob_header head;
head.size = ser::deserialize(in, std::type_identity<int>{});
head.includes_pk = ser::deserialize(in, std::type_identity<bool>{});
head.has_ck = ser::deserialize(in, std::type_identity<bool>{});
head.includes_ck = ser::deserialize(in, std::type_identity<bool>{});
return head;
}
template <typename Output>
static void write(Output& out, blob_header head) {
ser::serialize(out, head.size);
ser::serialize(out, head.includes_pk);
ser::serialize(out, head.has_ck);
ser::serialize(out, head.includes_ck);
}
template <typename Input>
static void skip(Input& in) {
ser::skip(in, std::type_identity<int>{});
ser::skip(in, std::type_identity<bool>{});
ser::skip(in, std::type_identity<bool>{});
ser::skip(in, std::type_identity<bool>{});
}
};
} // namespace ser
namespace {
template <typename RandomEngine>
static interval<int> generate_range(RandomEngine& rnd_engine, int start, int end, bool allow_open_ended_start = true) {
SCYLLA_ASSERT(start < end);
std::uniform_int_distribution<int> defined_bound_dist(0, 7);
std::uniform_int_distribution<int8_t> inclusive_dist(0, 1);
std::uniform_int_distribution<int> bound_dist(start, end);
const auto open_lower_bound = allow_open_ended_start && !defined_bound_dist(rnd_engine);
const auto open_upper_bound = !defined_bound_dist(rnd_engine);
if (open_lower_bound || open_upper_bound) {
const auto bound = bound_dist(rnd_engine);
if (open_lower_bound) {
return interval<int>::make_ending_with(
interval<int>::bound(bound, inclusive_dist(rnd_engine)));
}
return interval<int>::make_starting_with(
interval<int>::bound(bound, inclusive_dist(rnd_engine)));
}
const auto b1 = bound_dist(rnd_engine);
const auto b2 = bound_dist(rnd_engine);
if (b1 == b2) {
return interval<int>::make_starting_with(
interval<int>::bound(b1, inclusive_dist(rnd_engine)));
}
return interval<int>::make(
interval<int>::bound(std::min(b1, b2), inclusive_dist(rnd_engine)),
interval<int>::bound(std::max(b1, b2), inclusive_dist(rnd_engine)));
}
template <typename RandomEngine>
static query::clustering_row_ranges
generate_clustering_ranges(RandomEngine& rnd_engine, const schema& schema, const utils::chunked_vector<mutation>& mutations) {
if (!schema.clustering_key_size()) {
return {};
}
std::vector<clustering_key> all_cks;
std::set<clustering_key, clustering_key::less_compare> all_cks_sorted{clustering_key::less_compare(schema)};
for (const auto& mut : mutations) {
for (const auto& row : mut.partition().clustered_rows()) {
all_cks_sorted.insert(row.key());
}
thread::maybe_yield();
}
all_cks.reserve(all_cks_sorted.size());
all_cks.insert(all_cks.end(), all_cks_sorted.cbegin(), all_cks_sorted.cend());
query::clustering_row_ranges clustering_key_ranges;
int start = 0;
const int end = all_cks.size() - 1;
do {
auto clustering_index_range = generate_range(rnd_engine, start, end, start == 0);
if (clustering_index_range.end()) {
start = clustering_index_range.end()->value() + clustering_index_range.end()->is_inclusive();
} else {
start = end;
}
clustering_key_ranges.emplace_back(clustering_index_range.transform([&all_cks] (int i) {
return all_cks.at(i);
}));
} while (start < end);
return clustering_key_ranges;
}
static utils::chunked_vector<mutation>
slice_partitions(const schema& schema, const utils::chunked_vector<mutation>& partitions,
const interval<int>& partition_index_range, const query::partition_slice& slice) {
const auto& sb = partition_index_range.start();
const auto& eb = partition_index_range.end();
auto it = sb ? partitions.cbegin() + sb->value() + !sb->is_inclusive() : partitions.cbegin();
const auto end = eb ? partitions.cbegin() + eb->value() + eb->is_inclusive() : partitions.cend();
const auto& row_ranges = slice.default_row_ranges();
utils::chunked_vector<mutation> sliced_partitions;
for (;it != end; ++it) {
sliced_partitions.push_back(it->sliced(row_ranges));
thread::maybe_yield();
}
return sliced_partitions;
}
static void
validate_result_size(size_t i, schema_ptr schema, const utils::chunked_vector<mutation>& results, const utils::chunked_vector<mutation>& expected_partitions) {
if (results.size() == expected_partitions.size()) {
return;
}
auto expected = std::set<dht::decorated_key, dht::decorated_key::less_comparator>(dht::decorated_key::less_comparator(schema));
for (const auto& p : expected_partitions) {
expected.insert(p.decorated_key());
}
auto actual = std::set<dht::decorated_key, dht::decorated_key::less_comparator>(dht::decorated_key::less_comparator(schema));
for (const auto& m: results) {
actual.insert(m.decorated_key());
}
if (results.size() > expected_partitions.size()) {
std::vector<dht::decorated_key> diff;
std::set_difference(actual.cbegin(), actual.cend(), expected.cbegin(), expected.cend(), std::back_inserter(diff),
dht::decorated_key::less_comparator(schema));
testlog.error("[scan#{}]: got {} more partitions than expected, extra partitions: {}", i, diff.size(), diff);
tests::fail(format("Got {} more partitions than expected", diff.size()));
} else if (results.size() < expected_partitions.size()) {
std::vector<dht::decorated_key> diff;
std::set_difference(expected.cbegin(), expected.cend(), actual.cbegin(), actual.cend(), std::back_inserter(diff),
dht::decorated_key::less_comparator(schema));
testlog.error("[scan#{}]: got {} less partitions than expected, missing partitions: {}", i, diff.size(), diff);
tests::fail(format("Got {} less partitions than expected", diff.size()));
}
}
struct fuzzy_test_config {
uint32_t seed;
std::chrono::seconds timeout;
unsigned concurrency;
unsigned scans;
};
static void
run_fuzzy_test_scan(size_t i, fuzzy_test_config cfg, sharded<replica::database>& db, schema_ptr schema,
const utils::chunked_vector<frozen_mutation>& frozen_mutations) {
const auto seed = cfg.seed + (i + 1) * this_shard_id();
auto rnd_engine = std::mt19937(seed);
utils::chunked_vector<mutation> mutations;
for (const auto& mut : frozen_mutations) {
mutations.emplace_back(mut.unfreeze(schema));
thread::maybe_yield();
}
auto partition_index_range = generate_range(rnd_engine, 0, mutations.size() - 1);
auto partition_range = partition_index_range.transform([&mutations] (int i) {
return dht::ring_position(mutations[i].decorated_key());
});
const auto partition_slice = partition_slice_builder(*schema)
.with_ranges(generate_clustering_ranges(rnd_engine, *schema, mutations))
.with_option<query::partition_slice::option::allow_short_read>()
.build();
const auto is_stateful = stateful_query(std::uniform_int_distribution<int>(0, 3)(rnd_engine));
testlog.debug("[scan#{}]: seed={}, is_stateful={}, prange={}, ckranges={}", i, seed, is_stateful, partition_range,
partition_slice.default_row_ranges());
const auto [results, npages] = read_partitions_with_paged_scan(db, schema, 1000, 1024, is_stateful, partition_range, partition_slice);
const auto expected_partitions = slice_partitions(*schema, mutations, partition_index_range, partition_slice);
validate_result_size(i, schema, results, expected_partitions);
auto exp_it = expected_partitions.cbegin();
auto res_it = results.cbegin();
while (res_it != results.cend() && exp_it != expected_partitions.cend()) {
assert_that(*res_it++).is_equal_to(*exp_it++);
thread::maybe_yield();
}
testlog.trace("[scan#{}]: validated all partitions, both the expected and actual partition list should be exhausted now", i);
tests::require(res_it == results.cend() && exp_it == expected_partitions.cend());
}
future<> run_concurrently(size_t count, size_t concurrency, noncopyable_function<future<>(size_t)> func) {
return do_with(std::move(func), gate(), semaphore(concurrency), std::exception_ptr(),
[count] (noncopyable_function<future<>(size_t)>& func, gate& gate, semaphore& sem, std::exception_ptr& error) {
for (size_t i = 0; i < count; ++i) {
// Future is waited on indirectly (via `gate`).
(void)with_gate(gate, [&func, &sem, &error, i] {
return with_semaphore(sem, 1, [&func, &error, i] {
if (error) {
testlog.trace("Skipping func({}) due to previous func() returning with error", i);
return make_ready_future<>();
}
testlog.trace("Invoking func({})", i);
auto f = func(i).handle_exception([&error, i] (std::exception_ptr e) {
testlog.debug("func({}) invocation returned with error: {}", i, e);
error = std::move(e);
});
return f;
});
});
}
return gate.close().then([&error] {
if (error) {
testlog.trace("Run failed, returning with error: {}", error);
return make_exception_future<>(std::move(error));
}
testlog.trace("Run succeeded");
return make_ready_future<>();
});
});
}
static future<>
run_fuzzy_test_workload(fuzzy_test_config cfg, sharded<replica::database>& db, schema_ptr schema,
const utils::chunked_vector<frozen_mutation>& frozen_mutations) {
return run_concurrently(cfg.scans, cfg.concurrency, [cfg, &db, schema = std::move(schema), &frozen_mutations] (size_t i) {
return seastar::async([i, cfg, &db, schema, &frozen_mutations] () mutable {
run_fuzzy_test_scan(i, cfg, db, std::move(schema), frozen_mutations);
});
});
}
} // namespace
namespace multishard_query_test {
SEASTAR_THREAD_TEST_CASE(fuzzy_test) {
auto cql_cfg = cql_config_with_extensions();
cql_cfg.db_config->enable_commitlog(false);
do_with_cql_env_thread([] (cql_test_env& env) -> future<> {
// REPLACE RANDOM SEED HERE.
const auto seed = tests::random::get_int<uint32_t>();
testlog.info("fuzzy test seed: {}", seed);
const auto ks = create_vnodes_keyspace(env);
auto tbl = create_test_table(env, seed, ks, get_name(), false,
#ifdef DEBUG
std::uniform_int_distribution<size_t>(8, 32), // partitions
std::uniform_int_distribution<size_t>(0, 100), // clustering-rows
std::uniform_int_distribution<size_t>(0, 10), // range-tombstones
#elif DEVEL
std::uniform_int_distribution<size_t>(16, 64), // partitions
std::uniform_int_distribution<size_t>(0, 100), // clustering-rows
std::uniform_int_distribution<size_t>(0, 100), // range-tombstones
#else
std::uniform_int_distribution<size_t>(32, 64), // partitions
std::uniform_int_distribution<size_t>(0, 1000), // clustering-rows
std::uniform_int_distribution<size_t>(0, 1000), // range-tombstones
#endif
tests::default_timestamp_generator());
#if defined DEBUG
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{8}, 1, 1};
#elif defined DEVEL
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{2}, 2, 4};
#else
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{2}, 4, 8};
#endif
testlog.info("Running test workload with configuration: seed={}, timeout={}s, concurrency={}, scans={}", cfg.seed, cfg.timeout.count(),
cfg.concurrency, cfg.scans);
smp::invoke_on_all([cfg, db = &env.db(), gs = global_schema_ptr(tbl.schema), &compacted_frozen_mutations = tbl.compacted_frozen_mutations] {
return run_fuzzy_test_workload(cfg, *db, gs.get(), compacted_frozen_mutations);
}).handle_exception([seed] (std::exception_ptr e) {
testlog.error("Test workload failed with exception {}."
" To repeat this particular run, replace the random seed of the test, with that of this run ({})."
" Look for `REPLACE RANDOM SEED HERE` in the source of the test.",
e,
seed);
// Fail the test on any exception.
BOOST_FAIL("Test run finished with exception");
}).get();
return make_ready_future<>();
}, cql_cfg).get();
}
BOOST_AUTO_TEST_SUITE_END()