mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-29 12:47:02 +00:00
Merge 'test/boost/multishard_mutation_query: use random schema' from Botond Dénes
This test currently uses `test/lib/test_table.hh` to generate data for its test cases. This data generation facility is used by no other tests. Worse, it is redundant as we already have a random data generator with fixed schema, in `test/lib/mutation_source_test.hh`. So in this series, we migrate the test cases in said test file to random schema and its random data generation facilities. These are used by several other test cases and using random schema allows us to cover a wider (quasi-infinite) number of possibilities. After migrating all tests away from it, `test/lib/test_table.hh` is removed. This series also reduces the runtime of `fuzzy_test` drastically. It should now run in a few minutes or even in seconds (depending on the machine). Fixes: #12944 Closes #12574 * github.com:scylladb/scylladb: test/lib: rm test_table.hh test/boos/multishard_mutation_query_test: migrate other tests to random schema test/boost/multishard_mutation_query_test: use ks keyspace test/boost/multishard_mutation_query_test: improve test pager test/boost/multishard_mutation_query_test: refactor fuzzy_test test/boost: add multishard_mutation_query_test more memory types/user: add get_name() accessor test/lib/random_schema: add create_with_cql() test/lib/random_schema: fix udt handling test/lib/random_schema: type_generator(): also generate frozen types test/lib/random_schema: type_generator(): make static column generation conditional test/lib/random_schema: type_generator(): don't generate duration_type for keys test/lib/random_schema: generate_random_mutations(): add overload with seed test/lib/random_schema: generate_random_mutations(): respect range tombstone count param test/lib/random_schema: generate_random_mutations(): add yields test/lib/random_schema: generate_random_mutations(): fix indentation test/lib/random_schema: generate_random_mutations(): coroutinize method test/lib/random_schema: generate_random_mutations(): expand comment
This commit is contained in:
@@ -17,10 +17,10 @@
|
||||
#include "test/lib/eventually.hh"
|
||||
#include "test/lib/cql_assertions.hh"
|
||||
#include "test/lib/mutation_assertions.hh"
|
||||
#include "test/lib/test_table.hh"
|
||||
#include "test/lib/log.hh"
|
||||
#include "test/lib/test_utils.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "test/lib/random_schema.hh"
|
||||
|
||||
#include "test/lib/scylla_test_case.hh"
|
||||
|
||||
@@ -28,7 +28,116 @@
|
||||
|
||||
#include <boost/range/algorithm/sort.hpp>
|
||||
|
||||
const sstring KEYSPACE_NAME = "multishard_mutation_query_test";
|
||||
const sstring KEYSPACE_NAME = "ks";
|
||||
|
||||
namespace {
|
||||
|
||||
struct generated_table {
|
||||
schema_ptr schema;
|
||||
std::vector<dht::decorated_key> keys;
|
||||
std::vector<frozen_mutation> compacted_frozen_mutations;
|
||||
};
|
||||
|
||||
class random_schema_specification : public tests::random_schema_specification {
|
||||
sstring _table_name;
|
||||
std::unique_ptr<tests::random_schema_specification> _underlying_spec;
|
||||
public:
|
||||
random_schema_specification(sstring ks_name, sstring table_name, bool force_clustering_column)
|
||||
: tests::random_schema_specification(std::move(ks_name))
|
||||
, _table_name(std::move(table_name))
|
||||
, _underlying_spec(tests::make_random_schema_specification(
|
||||
keyspace_name(),
|
||||
std::uniform_int_distribution<size_t>(1, 4),
|
||||
std::uniform_int_distribution<size_t>(size_t(force_clustering_column), 4),
|
||||
std::uniform_int_distribution<size_t>(1, 4),
|
||||
std::uniform_int_distribution<size_t>(0, 4)))
|
||||
{ }
|
||||
virtual sstring table_name(std::mt19937& engine) override { return _table_name; }
|
||||
virtual sstring udt_name(std::mt19937& engine) override { return _underlying_spec->udt_name(engine); }
|
||||
virtual std::vector<data_type> partition_key_columns(std::mt19937& engine) override { return _underlying_spec->partition_key_columns(engine); }
|
||||
virtual std::vector<data_type> clustering_key_columns(std::mt19937& engine) override { return _underlying_spec->clustering_key_columns(engine); }
|
||||
virtual std::vector<data_type> regular_columns(std::mt19937& engine) override { return _underlying_spec->regular_columns(engine); }
|
||||
virtual std::vector<data_type> static_columns(std::mt19937& engine) override { return _underlying_spec->static_columns(engine); }
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
static generated_table create_test_table(
|
||||
cql_test_env& env,
|
||||
uint32_t seed,
|
||||
sstring ks_name,
|
||||
sstring tbl_name,
|
||||
bool force_clustering_column,
|
||||
std::uniform_int_distribution<size_t> partitions,
|
||||
std::uniform_int_distribution<size_t> clustering_rows,
|
||||
std::uniform_int_distribution<size_t> range_tombstones,
|
||||
tests::timestamp_generator ts_gen) {
|
||||
auto random_schema_spec = std::make_unique<random_schema_specification>(ks_name, tbl_name, force_clustering_column);
|
||||
auto random_schema = tests::random_schema(seed, *random_schema_spec);
|
||||
|
||||
testlog.info("\n{}", random_schema.cql());
|
||||
|
||||
random_schema.create_with_cql(env).get();
|
||||
|
||||
const auto mutations = tests::generate_random_mutations(
|
||||
seed,
|
||||
random_schema,
|
||||
ts_gen,
|
||||
tests::no_expiry_expiry_generator(),
|
||||
partitions,
|
||||
clustering_rows,
|
||||
range_tombstones).get();
|
||||
|
||||
auto schema = random_schema.schema();
|
||||
|
||||
std::vector<dht::decorated_key> keys;
|
||||
std::vector<frozen_mutation> compacted_frozen_mutations;
|
||||
keys.reserve(mutations.size());
|
||||
compacted_frozen_mutations.reserve(mutations.size());
|
||||
{
|
||||
gate write_gate;
|
||||
for (const auto& mut : mutations) {
|
||||
keys.emplace_back(mut.decorated_key());
|
||||
compacted_frozen_mutations.emplace_back(freeze(mut.compacted()));
|
||||
(void)with_gate(write_gate, [&] {
|
||||
return smp::submit_to(dht::shard_of(*schema, mut.decorated_key().token()), [&env, gs = global_schema_ptr(schema), mut = freeze(mut)] () mutable {
|
||||
return env.local_db().apply(gs.get(), std::move(mut), {}, db::commitlog_force_sync::no, db::no_timeout);
|
||||
});
|
||||
});
|
||||
thread::maybe_yield();
|
||||
}
|
||||
write_gate.close().get();
|
||||
}
|
||||
|
||||
return {random_schema.schema(), keys, compacted_frozen_mutations};
|
||||
}
|
||||
|
||||
api::timestamp_type no_tombstone_timestamp_generator(std::mt19937& engine, tests::timestamp_destination destination, api::timestamp_type min_timestamp) {
|
||||
switch (destination) {
|
||||
case tests::timestamp_destination::partition_tombstone:
|
||||
case tests::timestamp_destination::row_tombstone:
|
||||
case tests::timestamp_destination::collection_tombstone:
|
||||
case tests::timestamp_destination::range_tombstone:
|
||||
return api::missing_timestamp;
|
||||
default:
|
||||
return std::uniform_int_distribution<api::timestamp_type>(min_timestamp, api::max_timestamp)(engine);
|
||||
}
|
||||
}
|
||||
|
||||
static std::pair<schema_ptr, std::vector<dht::decorated_key>> create_test_table(cql_test_env& env, sstring ks_name,
|
||||
sstring tbl_name, int partition_count = 10 * smp::count, int row_per_partition_count = 10) {
|
||||
auto res = create_test_table(
|
||||
env,
|
||||
tests::random::get_int<uint32_t>(),
|
||||
std::move(ks_name),
|
||||
std::move(tbl_name),
|
||||
true,
|
||||
std::uniform_int_distribution<size_t>(partition_count, partition_count),
|
||||
std::uniform_int_distribution<size_t>(row_per_partition_count, row_per_partition_count),
|
||||
std::uniform_int_distribution<size_t>(0, 0),
|
||||
no_tombstone_timestamp_generator);
|
||||
return {std::move(res.schema), std::move(res.keys)};
|
||||
}
|
||||
|
||||
static uint64_t aggregate_querier_cache_stat(distributed<replica::database>& db, uint64_t query::querier_cache::stats::*stat) {
|
||||
return map_reduce(boost::irange(0u, smp::count), [stat, &db] (unsigned shard) {
|
||||
@@ -70,7 +179,7 @@ SEASTAR_THREAD_TEST_CASE(test_abandoned_read) {
|
||||
db.set_querier_cache_entry_ttl(1s);
|
||||
}).get();
|
||||
|
||||
auto [s, _] = test::create_test_table(env, KEYSPACE_NAME, "test_abandoned_read");
|
||||
auto [s, _] = create_test_table(env, KEYSPACE_NAME, get_name());
|
||||
(void)_;
|
||||
|
||||
auto cmd = query::read_command(
|
||||
@@ -88,10 +197,6 @@ SEASTAR_THREAD_TEST_CASE(test_abandoned_read) {
|
||||
|
||||
query_mutations_on_all_shards(env.db(), s, cmd, {query::full_partition_range}, nullptr, db::no_timeout).get();
|
||||
|
||||
check_cache_population(env.db(), 1);
|
||||
|
||||
sleep(2s).get();
|
||||
|
||||
require_eventually_empty_caches(env.db());
|
||||
|
||||
return make_ready_future<>();
|
||||
@@ -133,7 +238,7 @@ static std::pair<typename ResultBuilder::end_result_type, size_t>
|
||||
read_partitions_with_generic_paged_scan(distributed<replica::database>& db, schema_ptr s, uint32_t page_size, uint64_t max_size, stateful_query is_stateful,
|
||||
const dht::partition_range_vector& original_ranges, const query::partition_slice& slice, const std::function<void(size_t)>& page_hook = {}) {
|
||||
const auto query_uuid = is_stateful ? query_id::create_random_id() : query_id::create_null_id();
|
||||
ResultBuilder res_builder(s, slice);
|
||||
ResultBuilder res_builder(s, slice, page_size);
|
||||
auto cmd = query::read_command(
|
||||
s->id(),
|
||||
s->version(),
|
||||
@@ -156,25 +261,23 @@ read_partitions_with_generic_paged_scan(distributed<replica::database>& db, sche
|
||||
auto res = ResultBuilder::query(db, s, cmd, *ranges, nullptr, db::no_timeout);
|
||||
has_more = res_builder.add(*res);
|
||||
cmd.is_first_page = query::is_first_page::no;
|
||||
if (page_hook && has_more) {
|
||||
page_hook(0);
|
||||
}
|
||||
}
|
||||
|
||||
if (!has_more) {
|
||||
return std::pair(std::move(res_builder).get_end_result(), 1);
|
||||
}
|
||||
|
||||
unsigned npages = 0;
|
||||
|
||||
const auto cmp = dht::ring_position_comparator(*s);
|
||||
|
||||
unsigned npages = 1;
|
||||
|
||||
// Rest of the pages. Loop until an empty page turns up. Not very
|
||||
// sophisticated but simple and safe.
|
||||
while (has_more) {
|
||||
if (page_hook) {
|
||||
page_hook(npages);
|
||||
}
|
||||
|
||||
++npages;
|
||||
|
||||
// Force freeing the vector to avoid hiding any bugs related to storing
|
||||
// references to the ranges vector (which is not alive between pages in
|
||||
// real life).
|
||||
@@ -212,6 +315,12 @@ read_partitions_with_generic_paged_scan(distributed<replica::database>& db, sche
|
||||
}
|
||||
|
||||
has_more = res_builder.add(*res);
|
||||
if (has_more) {
|
||||
if (page_hook) {
|
||||
page_hook(npages);
|
||||
}
|
||||
npages++;
|
||||
}
|
||||
}
|
||||
|
||||
return std::pair(std::move(res_builder).get_end_result(), npages);
|
||||
@@ -232,6 +341,7 @@ public:
|
||||
private:
|
||||
schema_ptr _s;
|
||||
const query::partition_slice& _slice;
|
||||
uint64_t _page_size = 0;
|
||||
std::vector<mutation> _results;
|
||||
std::optional<dht::decorated_key> _last_pkey;
|
||||
std::optional<clustering_key> _last_ckey;
|
||||
@@ -260,7 +370,7 @@ public:
|
||||
return std::get<0>(query_mutations_on_all_shards(db, std::move(s), cmd, ranges, std::move(trace_state), timeout).get());
|
||||
}
|
||||
|
||||
explicit mutation_result_builder(schema_ptr s, const query::partition_slice& slice) : _s(std::move(s)), _slice(slice) { }
|
||||
explicit mutation_result_builder(schema_ptr s, const query::partition_slice& slice, uint64_t page_size) : _s(std::move(s)), _slice(slice), _page_size(page_size) { }
|
||||
|
||||
bool add(const reconcilable_result& res) {
|
||||
auto it = res.partitions().begin();
|
||||
@@ -292,7 +402,7 @@ public:
|
||||
_results.emplace_back(std::move(mut));
|
||||
}
|
||||
|
||||
return true;
|
||||
return res.is_short_read() || res.row_count() >= _page_size;
|
||||
}
|
||||
|
||||
const dht::decorated_key& last_pkey() const { return _last_pkey.value(); }
|
||||
@@ -311,6 +421,7 @@ public:
|
||||
private:
|
||||
schema_ptr _s;
|
||||
const query::partition_slice& _slice;
|
||||
uint64_t _page_size = 0;
|
||||
std::vector<query::result_set_row> _rows;
|
||||
std::optional<dht::decorated_key> _last_pkey;
|
||||
std::optional<clustering_key> _last_ckey;
|
||||
@@ -344,7 +455,7 @@ public:
|
||||
return std::get<0>(query_data_on_all_shards(db, std::move(s), cmd, ranges, query::result_options::only_result(), std::move(trace_state), timeout).get());
|
||||
}
|
||||
|
||||
explicit data_result_builder(schema_ptr s, const query::partition_slice& slice) : _s(std::move(s)), _slice(slice) { }
|
||||
explicit data_result_builder(schema_ptr s, const query::partition_slice& slice, uint64_t page_size) : _s(std::move(s)), _slice(slice), _page_size(page_size) { }
|
||||
|
||||
bool add(const query::result& raw_res) {
|
||||
auto res = query::result_set::from_raw_result(_s, _slice, raw_res);
|
||||
@@ -362,7 +473,7 @@ public:
|
||||
_last_pkey_rows = 1;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
return raw_res.is_short_read() || res.rows().size() >= _page_size;
|
||||
}
|
||||
|
||||
const dht::decorated_key& last_pkey() const { return _last_pkey.value(); }
|
||||
@@ -410,14 +521,20 @@ SEASTAR_THREAD_TEST_CASE(test_read_all) {
|
||||
db.set_querier_cache_entry_ttl(2s);
|
||||
}).get();
|
||||
|
||||
auto [s, pkeys] = test::create_test_table(env, KEYSPACE_NAME, "test_read_all");
|
||||
auto [s, pkeys] = create_test_table(env, KEYSPACE_NAME, get_name());
|
||||
|
||||
// First read all partition-by-partition (not paged).
|
||||
auto results1 = read_all_partitions_one_by_one(env.db(), s, pkeys);
|
||||
|
||||
uint64_t lookups = 0;
|
||||
|
||||
// Then do a paged range-query, with reader caching
|
||||
auto results2 = read_all_partitions_with_paged_scan(env.db(), s, 4, stateful_query::yes, [&] (size_t) {
|
||||
check_cache_population(env.db(), 1);
|
||||
auto results2 = read_all_partitions_with_paged_scan(env.db(), s, 4, stateful_query::yes, [&] (size_t page) {
|
||||
const auto new_lookups = aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::lookups);
|
||||
if (page) {
|
||||
tests::require(new_lookups > lookups);
|
||||
}
|
||||
lookups = new_lookups;
|
||||
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::drops), 0u);
|
||||
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::misses), 0u);
|
||||
}).first;
|
||||
@@ -451,13 +568,18 @@ SEASTAR_THREAD_TEST_CASE(test_read_all_multi_range) {
|
||||
db.set_querier_cache_entry_ttl(2s);
|
||||
}).get();
|
||||
|
||||
auto [s, pkeys] = test::create_test_table(env, KEYSPACE_NAME, "test_read_all");
|
||||
auto [s, pkeys] = create_test_table(env, KEYSPACE_NAME, get_name());
|
||||
|
||||
const auto limit = std::numeric_limits<uint64_t>::max();
|
||||
|
||||
const auto slice = s->full_slice();
|
||||
|
||||
testlog.info("pkeys.size()={}", pkeys.size());
|
||||
|
||||
for (const auto step : {1ul, pkeys.size() / 4u, pkeys.size() / 2u}) {
|
||||
if (!step) {
|
||||
continue;
|
||||
}
|
||||
|
||||
dht::partition_range_vector ranges;
|
||||
ranges.push_back(dht::partition_range::make_ending_with({*pkeys.begin(), false}));
|
||||
@@ -506,7 +628,7 @@ SEASTAR_THREAD_TEST_CASE(test_read_with_partition_row_limits) {
|
||||
db.set_querier_cache_entry_ttl(2s);
|
||||
}).get();
|
||||
|
||||
auto [s, pkeys] = test::create_test_table(env, KEYSPACE_NAME, get_name(), 2, 10);
|
||||
auto [s, pkeys] = create_test_table(env, KEYSPACE_NAME, get_name(), 2, 10);
|
||||
|
||||
unsigned i = 0;
|
||||
|
||||
@@ -524,9 +646,15 @@ SEASTAR_THREAD_TEST_CASE(test_read_with_partition_row_limits) {
|
||||
// First read all partition-by-partition (not paged).
|
||||
auto results1 = read_all_partitions_one_by_one(env.db(), s, pkeys);
|
||||
|
||||
uint64_t lookups = 0;
|
||||
|
||||
// Then do a paged range-query, with reader caching
|
||||
auto results2 = read_all_partitions_with_paged_scan(env.db(), s, 4, stateful_query::yes, [&] (size_t) {
|
||||
check_cache_population(env.db(), 1);
|
||||
auto results2 = read_all_partitions_with_paged_scan(env.db(), s, 4, stateful_query::yes, [&] (size_t page) {
|
||||
const auto new_lookups = aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::lookups);
|
||||
if (page) {
|
||||
tests::require(new_lookups > lookups);
|
||||
}
|
||||
lookups = new_lookups;
|
||||
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::drops), 0u);
|
||||
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::misses), 0u);
|
||||
}).first;
|
||||
@@ -552,28 +680,41 @@ SEASTAR_THREAD_TEST_CASE(test_evict_a_shard_reader_on_each_page) {
|
||||
db.set_querier_cache_entry_ttl(2s);
|
||||
}).get();
|
||||
|
||||
auto [s, pkeys] = test::create_test_table(env, KEYSPACE_NAME, "test_evict_a_shard_reader_on_each_page");
|
||||
auto [s, pkeys] = create_test_table(env, KEYSPACE_NAME, get_name());
|
||||
|
||||
// First read all partition-by-partition (not paged).
|
||||
auto results1 = read_all_partitions_one_by_one(env.db(), s, pkeys);
|
||||
|
||||
int64_t lookups = 0;
|
||||
uint64_t evictions = 0;
|
||||
|
||||
// Then do a paged range-query
|
||||
auto [results2, npages] = read_all_partitions_with_paged_scan(env.db(), s, 4, stateful_query::yes, [&] (size_t page) {
|
||||
check_cache_population(env.db(), 1);
|
||||
const auto new_lookups = aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::lookups);
|
||||
if (page) {
|
||||
tests::require(new_lookups > lookups, std::source_location::current());
|
||||
}
|
||||
lookups = new_lookups;
|
||||
|
||||
env.db().invoke_on(page % smp::count, [&] (replica::database& db) {
|
||||
return db.get_querier_cache().evict_one();
|
||||
}).get();
|
||||
for (unsigned shard = 0; shard < smp::count; ++shard) {
|
||||
auto evicted = smp::submit_to(shard, [&] {
|
||||
return env.local_db().get_querier_cache().evict_one();
|
||||
}).get();
|
||||
if (evicted) {
|
||||
++evictions;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::drops), 0u);
|
||||
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::misses), page);
|
||||
tests::require(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::misses) >= page, std::source_location::current());
|
||||
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::drops), 0u, std::source_location::current());
|
||||
});
|
||||
|
||||
check_results_are_equal(results1, results2);
|
||||
|
||||
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::drops), 0u);
|
||||
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::time_based_evictions), 0u);
|
||||
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::resource_based_evictions), npages);
|
||||
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::resource_based_evictions), evictions);
|
||||
|
||||
require_eventually_empty_caches(env.db());
|
||||
|
||||
@@ -588,7 +729,7 @@ SEASTAR_THREAD_TEST_CASE(test_read_reversed) {
|
||||
|
||||
auto& db = env.db();
|
||||
|
||||
auto [s, pkeys] = test::create_test_table(env, KEYSPACE_NAME, get_name(), 4, 8);
|
||||
auto [s, pkeys] = create_test_table(env, KEYSPACE_NAME, get_name(), 4, 8);
|
||||
|
||||
unsigned i = 0;
|
||||
|
||||
@@ -719,196 +860,6 @@ struct serializer<blob_header> {
|
||||
|
||||
namespace {
|
||||
|
||||
const uint32_t min_blob_size = sizeof(blob_header);
|
||||
|
||||
static bytes make_payload(const schema& schema, size_t size, const partition_key& pk, const clustering_key* const ck) {
|
||||
assert(size >= min_blob_size);
|
||||
|
||||
blob_header head;
|
||||
head.size = size;
|
||||
|
||||
size_t size_needed = min_blob_size;
|
||||
|
||||
auto pk_components = pk.explode(schema);
|
||||
size_needed += calculate_serialized_size(pk_components);
|
||||
|
||||
head.includes_pk = size_needed <= size;
|
||||
head.has_ck = bool(ck);
|
||||
|
||||
auto ck_components = std::vector<bytes>{};
|
||||
if (ck) {
|
||||
ck_components = ck->explode(schema);
|
||||
size_needed += calculate_serialized_size(ck_components);
|
||||
}
|
||||
head.includes_ck = ck && size_needed <= size;
|
||||
|
||||
auto buf_os = buffer_ostream(size);
|
||||
ser::serialize(buf_os, head);
|
||||
if (head.includes_pk) {
|
||||
ser::serialize(buf_os, pk_components);
|
||||
}
|
||||
if (ck && head.includes_ck) {
|
||||
ser::serialize(buf_os, ck_components);
|
||||
}
|
||||
|
||||
return std::move(buf_os).detach();
|
||||
}
|
||||
|
||||
static bool validate_payload(const schema& schema, atomic_cell_value_view payload_view, const partition_key& pk, const clustering_key* const ck) {
|
||||
auto istream = fragmented_memory_input_stream(fragment_range(payload_view).begin(), payload_view.size());
|
||||
auto head = ser::deserialize(istream, boost::type<blob_header>{});
|
||||
|
||||
const size_t actual_size = payload_view.size();
|
||||
|
||||
if (head.size != actual_size) {
|
||||
testlog.error("Validating payload for pk={}, ck={} failed, sizes differ: stored={}, actual={}", pk, seastar::lazy_deref(ck), head.size,
|
||||
actual_size);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!head.includes_pk) {
|
||||
return true;
|
||||
}
|
||||
|
||||
auto stored_pk = partition_key::from_exploded(schema, ser::deserialize(istream, boost::type<std::vector<bytes>>{}));
|
||||
if (!stored_pk.equal(schema, pk)) {
|
||||
testlog.error("Validating payload for pk={}, ck={} failed, pks differ: stored={}, actual={}", pk, seastar::lazy_deref(ck), stored_pk, pk);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (bool(ck) != head.has_ck) {
|
||||
const auto stored = head.has_ck ? "clustering" : "static";
|
||||
const auto actual = ck ? "clustering" : "static";
|
||||
testlog.error("Validating payload for pk={}, ck={} failed, row types differ: stored={}, actual={}", pk, seastar::lazy_deref(ck), stored, actual);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!ck || !head.has_ck || !head.includes_ck) {
|
||||
return true;
|
||||
}
|
||||
|
||||
auto stored_ck = clustering_key::from_exploded(schema, ser::deserialize(istream, boost::type<std::vector<bytes>>{}));
|
||||
if (!stored_ck.equal(schema, *ck)) {
|
||||
testlog.error("Validating payload for pk={}, ck={} failed, cks differ: stored={}, actual={}", pk, seastar::lazy_deref(ck), stored_ck, *ck);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename RandomEngine>
|
||||
static test::population_description create_fuzzy_test_table(cql_test_env& env, RandomEngine& rnd_engine) {
|
||||
testlog.info("Generating combinations...");
|
||||
|
||||
const std::optional<std::uniform_int_distribution<int>> static_row_configurations[] = {
|
||||
std::nullopt,
|
||||
std::uniform_int_distribution<int>(min_blob_size, 100),
|
||||
std::uniform_int_distribution<int>( 101, 1'000),
|
||||
};
|
||||
|
||||
const std::uniform_int_distribution<int> clustering_row_count_configurations[] = {
|
||||
std::uniform_int_distribution<int>( 0, 2),
|
||||
std::uniform_int_distribution<int>( 3, 49),
|
||||
std::uniform_int_distribution<int>(50, 999),
|
||||
};
|
||||
|
||||
const std::uniform_int_distribution<int> clustering_row_configurations[] = {
|
||||
std::uniform_int_distribution<int>( min_blob_size, min_blob_size + 10),
|
||||
std::uniform_int_distribution<int>(min_blob_size + 11, 200),
|
||||
std::uniform_int_distribution<int>( 201, 1'000),
|
||||
};
|
||||
|
||||
// std::pair(count, size)
|
||||
const std::pair<std::uniform_int_distribution<int>, std::uniform_int_distribution<int>> range_deletion_configurations[] = {
|
||||
std::pair(std::uniform_int_distribution<int>(0, 1), std::uniform_int_distribution<int>(1, 2)),
|
||||
std::pair(std::uniform_int_distribution<int>(1, 2), std::uniform_int_distribution<int>(1, 3)),
|
||||
std::pair(std::uniform_int_distribution<int>(1, 2), std::uniform_int_distribution<int>(4, 7)),
|
||||
std::pair(std::uniform_int_distribution<int>(3, 9), std::uniform_int_distribution<int>(2, 9)),
|
||||
std::pair(std::uniform_int_distribution<int>(30, 99), std::uniform_int_distribution<int>(1, 2)),
|
||||
};
|
||||
|
||||
std::vector<test::partition_configuration> partition_configs;
|
||||
|
||||
auto count_for = [] (size_t s, size_t cc, size_t cs, size_t d) {
|
||||
#ifdef DEBUG
|
||||
const auto tier1_count = 1;
|
||||
const auto tier2_count = 1;
|
||||
const auto tier3_count = 0;
|
||||
#else
|
||||
const auto tier1_count = 100;
|
||||
const auto tier2_count = 10;
|
||||
const auto tier3_count = 2;
|
||||
#endif
|
||||
if (s > 1 || cc > 1 || cs > 1 || d > 1) {
|
||||
return tier3_count;
|
||||
}
|
||||
if (s > 0 || cc > 0 || cs > 0 || d > 0) {
|
||||
return tier2_count;
|
||||
}
|
||||
return tier1_count;
|
||||
};
|
||||
|
||||
// Generate (almost) all combinations of the above.
|
||||
for (size_t s = 0; s < std::size(static_row_configurations); ++s) {
|
||||
for (size_t cc = 0; cc < std::size(clustering_row_count_configurations); ++cc) {
|
||||
// We don't want to generate partitions that are too huge.
|
||||
// So don't allow loads of large rows. This is achieved by
|
||||
// omitting from the last (largest) size configurations when the
|
||||
// larger count configurations are reached.
|
||||
const size_t omit_from_end = std::max(0, int(cc) - 1);
|
||||
for (size_t cs = 0; cs < std::size(clustering_row_configurations) - omit_from_end; ++cs) {
|
||||
for (size_t d = 0; d < std::size(range_deletion_configurations); ++d) {
|
||||
const auto count = count_for(s, cc, cs, d);
|
||||
const auto [range_delete_count_config, range_delete_size_config] = range_deletion_configurations[d];
|
||||
partition_configs.push_back(test::partition_configuration{
|
||||
static_row_configurations[s],
|
||||
clustering_row_count_configurations[cc],
|
||||
clustering_row_configurations[cs],
|
||||
range_delete_count_config,
|
||||
range_delete_size_config,
|
||||
count});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto config_distribution = std::uniform_int_distribution<int>(0, 1);
|
||||
const auto extreme_static_row_dist = std::uniform_int_distribution<int>(1'001, 10'000);
|
||||
const auto extreme_clustering_row_count_dist = std::uniform_int_distribution<int>(1'000, 10'000);
|
||||
const auto extreme_clustering_row_dist = std::uniform_int_distribution<int>(1'001, 10'000);
|
||||
|
||||
// Extreme cases, just one of each.
|
||||
const auto [range_delete_count_config, range_delete_size_config] = range_deletion_configurations[config_distribution(rnd_engine)];
|
||||
partition_configs.push_back(test::partition_configuration{
|
||||
extreme_static_row_dist,
|
||||
clustering_row_count_configurations[config_distribution(rnd_engine)],
|
||||
clustering_row_configurations[config_distribution(rnd_engine)],
|
||||
range_delete_count_config,
|
||||
range_delete_size_config,
|
||||
1});
|
||||
partition_configs.push_back(test::partition_configuration{
|
||||
static_row_configurations[config_distribution(rnd_engine)],
|
||||
extreme_clustering_row_count_dist,
|
||||
clustering_row_configurations[config_distribution(rnd_engine)],
|
||||
range_delete_count_config,
|
||||
range_delete_size_config,
|
||||
1});
|
||||
partition_configs.push_back(test::partition_configuration{
|
||||
static_row_configurations[config_distribution(rnd_engine)],
|
||||
clustering_row_count_configurations[config_distribution(rnd_engine)],
|
||||
extreme_clustering_row_dist,
|
||||
range_delete_count_config,
|
||||
range_delete_size_config,
|
||||
1});
|
||||
|
||||
const auto partition_count = boost::accumulate(partition_configs, size_t(0),
|
||||
[] (size_t c, const test::partition_configuration& part_config) { return c + part_config.count; });
|
||||
|
||||
testlog.info("Done. Generated {} combinations, {} partitions in total.", partition_configs.size(), partition_count);
|
||||
|
||||
return test::create_test_table(env, KEYSPACE_NAME, "fuzzy_test", rnd_engine(), std::move(partition_configs), &make_payload);
|
||||
}
|
||||
|
||||
template <typename RandomEngine>
|
||||
static nonwrapping_range<int> generate_range(RandomEngine& rnd_engine, int start, int end, bool allow_open_ended_start = true) {
|
||||
assert(start < end);
|
||||
@@ -943,13 +894,19 @@ static nonwrapping_range<int> generate_range(RandomEngine& rnd_engine, int start
|
||||
|
||||
template <typename RandomEngine>
|
||||
static query::clustering_row_ranges
|
||||
generate_clustering_ranges(RandomEngine& rnd_engine, const schema& schema, const std::vector<test::partition_description>& part_descs) {
|
||||
generate_clustering_ranges(RandomEngine& rnd_engine, const schema& schema, const std::vector<mutation>& mutations) {
|
||||
if (!schema.clustering_key_size()) {
|
||||
return {};
|
||||
}
|
||||
|
||||
std::vector<clustering_key> all_cks;
|
||||
std::set<clustering_key, clustering_key::less_compare> all_cks_sorted{clustering_key::less_compare(schema)};
|
||||
|
||||
for (const auto& part_desc : part_descs) {
|
||||
all_cks_sorted.insert(part_desc.live_rows.cbegin(), part_desc.live_rows.cend());
|
||||
all_cks_sorted.insert(part_desc.dead_rows.cbegin(), part_desc.dead_rows.cend());
|
||||
for (const auto& mut : mutations) {
|
||||
for (const auto& row : mut.partition().clustered_rows()) {
|
||||
all_cks_sorted.insert(row.key());
|
||||
}
|
||||
thread::maybe_yield();
|
||||
}
|
||||
all_cks.reserve(all_cks_sorted.size());
|
||||
all_cks.insert(all_cks.end(), all_cks_sorted.cbegin(), all_cks_sorted.cend());
|
||||
@@ -975,56 +932,33 @@ generate_clustering_ranges(RandomEngine& rnd_engine, const schema& schema, const
|
||||
return clustering_key_ranges;
|
||||
}
|
||||
|
||||
struct expected_partition {
|
||||
const dht::decorated_key& dkey;
|
||||
bool has_static_row;
|
||||
std::vector<clustering_key> live_rows;
|
||||
std::vector<clustering_key> dead_rows;
|
||||
query::clustering_row_ranges range_tombstones;
|
||||
};
|
||||
|
||||
static std::vector<expected_partition>
|
||||
slice_partitions(const schema& schema, const std::vector<test::partition_description>& partitions,
|
||||
static std::vector<mutation>
|
||||
slice_partitions(const schema& schema, const std::vector<mutation>& partitions,
|
||||
const nonwrapping_range<int>& partition_index_range, const query::partition_slice& slice) {
|
||||
const auto& sb = partition_index_range.start();
|
||||
const auto& eb = partition_index_range.end();
|
||||
auto it = sb ? partitions.cbegin() + sb->value() + !sb->is_inclusive() : partitions.cbegin();
|
||||
const auto end = eb ? partitions.cbegin() + eb->value() + eb->is_inclusive() : partitions.cend();
|
||||
|
||||
const auto tri_cmp = clustering_key::tri_compare(schema);
|
||||
const auto& row_ranges = slice.default_row_ranges();
|
||||
|
||||
std::vector<expected_partition> sliced_partitions;
|
||||
std::vector<mutation> sliced_partitions;
|
||||
for (;it != end; ++it) {
|
||||
auto sliced_live_rows = test::slice_keys(schema, it->live_rows, row_ranges);
|
||||
auto sliced_dead_rows = test::slice_keys(schema, it->dead_rows, row_ranges);
|
||||
|
||||
query::clustering_row_ranges overlapping_range_tombstones;
|
||||
std::copy_if(it->range_tombstones.cbegin(), it->range_tombstones.cend(), std::back_inserter(overlapping_range_tombstones),
|
||||
[&] (const query::clustering_range& rt) {
|
||||
return std::any_of(row_ranges.cbegin(), row_ranges.cend(), [&] (const query::clustering_range& r) {
|
||||
return rt.overlaps(r, clustering_key::tri_compare(schema));
|
||||
});
|
||||
});
|
||||
|
||||
if (sliced_live_rows.empty() && sliced_dead_rows.empty() && overlapping_range_tombstones.empty() && !it->has_static_row) {
|
||||
continue;
|
||||
}
|
||||
sliced_partitions.emplace_back(expected_partition{it->dkey, it->has_static_row, std::move(sliced_live_rows), std::move(sliced_dead_rows),
|
||||
std::move(overlapping_range_tombstones)});
|
||||
sliced_partitions.push_back(it->sliced(row_ranges));
|
||||
thread::maybe_yield();
|
||||
}
|
||||
return sliced_partitions;
|
||||
}
|
||||
|
||||
static void
|
||||
validate_result_size(size_t i, schema_ptr schema, const std::vector<mutation>& results, const std::vector<expected_partition>& expected_partitions) {
|
||||
validate_result_size(size_t i, schema_ptr schema, const std::vector<mutation>& results, const std::vector<mutation>& expected_partitions) {
|
||||
if (results.size() == expected_partitions.size()) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto expected = std::set<dht::decorated_key, dht::decorated_key::less_comparator>(dht::decorated_key::less_comparator(schema));
|
||||
for (const auto& p : expected_partitions) {
|
||||
expected.insert(p.dkey);
|
||||
expected.insert(p.decorated_key());
|
||||
}
|
||||
|
||||
auto actual = std::set<dht::decorated_key, dht::decorated_key::less_comparator>(dht::decorated_key::less_comparator(schema));
|
||||
@@ -1047,146 +981,6 @@ validate_result_size(size_t i, schema_ptr schema, const std::vector<mutation>& r
|
||||
}
|
||||
}
|
||||
|
||||
[[nodiscard]] static bool validate_row(const schema& s, const partition_key& pk, const clustering_key* const ck, column_kind kind, const row& r) {
|
||||
bool OK = true;
|
||||
const auto& cdef = s.column_at(kind, 0);
|
||||
if (auto* cell = r.find_cell(0)) {
|
||||
OK &= tests::check(validate_payload(s, cell->as_atomic_cell(cdef).value(), pk, ck));
|
||||
}
|
||||
return OK;
|
||||
}
|
||||
|
||||
[[nodiscard]] static bool validate_static_row(const schema& s, const partition_key& pk, const row& sr) {
|
||||
return validate_row(s, pk, {}, column_kind::static_column, sr);
|
||||
}
|
||||
|
||||
[[nodiscard]] static bool validate_regular_row(const schema& s, const partition_key& pk, const clustering_key& ck, const deletable_row& dr) {
|
||||
return validate_row(s, pk, &ck, column_kind::regular_column, dr.cells());
|
||||
}
|
||||
|
||||
struct pkey_with_schema {
|
||||
const dht::decorated_key& key;
|
||||
const schema& s;
|
||||
bool operator==(const pkey_with_schema& pk) const {
|
||||
return key.equal(s, pk.key);
|
||||
}
|
||||
};
|
||||
|
||||
static std::ostream& operator<<(std::ostream& os, const pkey_with_schema& pk) {
|
||||
os << pk.key;
|
||||
return os;
|
||||
}
|
||||
|
||||
struct ckey_with_schema {
|
||||
const clustering_key& key;
|
||||
const schema& s;
|
||||
bool operator==(const ckey_with_schema& ck) const {
|
||||
return key.equal(s, ck.key);
|
||||
}
|
||||
};
|
||||
|
||||
static std::ostream& operator<<(std::ostream& os, const ckey_with_schema& ck) {
|
||||
os << ck.key;
|
||||
return os;
|
||||
}
|
||||
|
||||
struct with_schema_wrapper {
|
||||
const schema& s;
|
||||
|
||||
pkey_with_schema operator()(const dht::decorated_key& pkey) const {
|
||||
return pkey_with_schema{pkey, s};
|
||||
}
|
||||
ckey_with_schema operator()(const clustering_key& ckey) const {
|
||||
return ckey_with_schema{ckey, s};
|
||||
}
|
||||
};
|
||||
|
||||
static void validate_result(size_t i, const mutation& result_mut, const expected_partition& expected_part) {
|
||||
testlog.trace("[scan#{}]: validating {}: has_static_row={}, live_rows={}, dead_rows={}", i, expected_part.dkey, expected_part.has_static_row,
|
||||
expected_part.live_rows, expected_part.dead_rows);
|
||||
|
||||
auto& schema = *result_mut.schema();
|
||||
const auto wrapper = with_schema_wrapper{schema};
|
||||
|
||||
bool OK = true;
|
||||
|
||||
tests::require_equal(result_mut.partition().static_row().empty(), !expected_part.has_static_row);
|
||||
OK &= validate_static_row(schema, expected_part.dkey.key(), result_mut.partition().static_row().get());
|
||||
|
||||
const auto& res_rows = result_mut.partition().clustered_rows();
|
||||
auto res_it = res_rows.begin();
|
||||
const auto res_end = res_rows.end();
|
||||
|
||||
auto exp_live_it = expected_part.live_rows.cbegin();
|
||||
const auto exp_live_end = expected_part.live_rows.cend();
|
||||
|
||||
auto exp_dead_it = expected_part.dead_rows.cbegin();
|
||||
const auto exp_dead_end = expected_part.dead_rows.cend();
|
||||
|
||||
for (; res_it != res_end && (exp_live_it != exp_live_end || exp_dead_it != exp_dead_end); ++res_it) {
|
||||
const bool is_live = res_it->row().is_live(schema, column_kind::regular_column);
|
||||
|
||||
// Check that we have remaining expected rows of the respective liveness.
|
||||
if (is_live) {
|
||||
tests::require(exp_live_it != exp_live_end);
|
||||
} else {
|
||||
tests::require(exp_dead_it != exp_dead_end);
|
||||
}
|
||||
|
||||
testlog.trace("[scan#{}]: validating {}/{}: is_live={}", i, expected_part.dkey, res_it->key(), is_live);
|
||||
|
||||
if (is_live) {
|
||||
OK &= tests::check_equal(wrapper(res_it->key()), wrapper(*exp_live_it++));
|
||||
} else {
|
||||
// FIXME: Only a fraction of the dead rows is present in the result.
|
||||
if (!res_it->key().equal(schema, *exp_dead_it)) {
|
||||
testlog.trace("[scan#{}]: validating {}/{}: dead row in the result is not the expected one: {} != {}", i, expected_part.dkey,
|
||||
res_it->key(), res_it->key(), *exp_dead_it);
|
||||
}
|
||||
|
||||
// The dead row is not the one we expected it to be. Check that at
|
||||
// least that it *is* among the expected dead rows.
|
||||
if (!res_it->key().equal(schema, *exp_dead_it)) {
|
||||
auto it = std::find_if(exp_dead_it, exp_dead_end, [&] (const clustering_key& key) {
|
||||
return key.equal(schema, res_it->key());
|
||||
});
|
||||
OK &= tests::check(it != exp_dead_it);
|
||||
|
||||
testlog.trace("[scan#{}]: validating {}/{}: skipped over {} expected dead rows", i, expected_part.dkey,
|
||||
res_it->key(), std::distance(exp_dead_it, it));
|
||||
exp_dead_it = it;
|
||||
}
|
||||
++exp_dead_it;
|
||||
}
|
||||
OK &= validate_regular_row(schema, expected_part.dkey.key(), res_it->key(), res_it->row());
|
||||
}
|
||||
|
||||
// We don't want to call res_rows.calculate_size() as it has linear complexity.
|
||||
// Instead, check that after iterating through the results and expected
|
||||
// results in lock-step, both have reached the end.
|
||||
OK &= tests::check(res_it == res_end);
|
||||
if (res_it != res_end) {
|
||||
testlog.error("[scan#{}]: validating {} failed: result contains unexpected trailing rows: {}", i, expected_part.dkey,
|
||||
boost::copy_range<std::vector<clustering_key>>(
|
||||
boost::iterator_range<mutation_partition::rows_type::const_iterator>(res_it, res_end)
|
||||
| boost::adaptors::transformed([] (const rows_entry& e) { return e.key(); })));
|
||||
}
|
||||
|
||||
OK &= tests::check(exp_live_it == exp_live_end);
|
||||
if (exp_live_it != exp_live_end) {
|
||||
testlog.error("[scan#{}]: validating {} failed: {} expected live rows missing from result", i, expected_part.dkey,
|
||||
std::distance(exp_live_it, exp_live_end));
|
||||
}
|
||||
|
||||
// FIXME: see note about dead rows above.
|
||||
if (exp_dead_it != exp_dead_end) {
|
||||
testlog.trace("[scan#{}]: validating {}: {} expected dead rows missing from result", i, expected_part.dkey,
|
||||
std::distance(exp_dead_it, exp_dead_end));
|
||||
}
|
||||
|
||||
tests::require(OK);
|
||||
}
|
||||
|
||||
struct fuzzy_test_config {
|
||||
uint32_t seed;
|
||||
std::chrono::seconds timeout;
|
||||
@@ -1196,17 +990,23 @@ struct fuzzy_test_config {
|
||||
|
||||
static void
|
||||
run_fuzzy_test_scan(size_t i, fuzzy_test_config cfg, distributed<replica::database>& db, schema_ptr schema,
|
||||
const std::vector<test::partition_description>& part_descs) {
|
||||
const std::vector<frozen_mutation>& frozen_mutations) {
|
||||
const auto seed = cfg.seed + (i + 1) * this_shard_id();
|
||||
auto rnd_engine = std::mt19937(seed);
|
||||
|
||||
auto partition_index_range = generate_range(rnd_engine, 0, part_descs.size() - 1);
|
||||
auto partition_range = partition_index_range.transform([&part_descs] (int i) {
|
||||
return dht::ring_position(part_descs[i].dkey);
|
||||
std::vector<mutation> mutations;
|
||||
for (const auto& mut : frozen_mutations) {
|
||||
mutations.emplace_back(mut.unfreeze(schema));
|
||||
thread::maybe_yield();
|
||||
}
|
||||
|
||||
auto partition_index_range = generate_range(rnd_engine, 0, mutations.size() - 1);
|
||||
auto partition_range = partition_index_range.transform([&mutations] (int i) {
|
||||
return dht::ring_position(mutations[i].decorated_key());
|
||||
});
|
||||
|
||||
const auto partition_slice = partition_slice_builder(*schema)
|
||||
.with_ranges(generate_clustering_ranges(rnd_engine, *schema, part_descs))
|
||||
.with_ranges(generate_clustering_ranges(rnd_engine, *schema, mutations))
|
||||
.with_option<query::partition_slice::option::allow_short_read>()
|
||||
.build();
|
||||
|
||||
@@ -1217,16 +1017,15 @@ run_fuzzy_test_scan(size_t i, fuzzy_test_config cfg, distributed<replica::databa
|
||||
|
||||
const auto [results, npages] = read_partitions_with_paged_scan(db, schema, 1000, 1024, is_stateful, partition_range, partition_slice);
|
||||
|
||||
const auto expected_partitions = slice_partitions(*schema, part_descs, partition_index_range, partition_slice);
|
||||
const auto expected_partitions = slice_partitions(*schema, mutations, partition_index_range, partition_slice);
|
||||
|
||||
validate_result_size(i, schema, results, expected_partitions);
|
||||
|
||||
const auto wrapper = with_schema_wrapper{*schema};
|
||||
auto exp_it = expected_partitions.cbegin();
|
||||
auto res_it = results.cbegin();
|
||||
while (res_it != results.cend() && exp_it != expected_partitions.cend()) {
|
||||
tests::require_equal(wrapper(res_it->decorated_key()), wrapper(exp_it->dkey));
|
||||
validate_result(i, *res_it++, *exp_it++);
|
||||
assert_that(*res_it++).is_equal_to(*exp_it++);
|
||||
thread::maybe_yield();
|
||||
}
|
||||
|
||||
testlog.trace("[scan#{}]: validated all partitions, both the expected and actual partition list should be exhausted now", i);
|
||||
@@ -1266,10 +1065,10 @@ future<> run_concurrently(size_t count, size_t concurrency, noncopyable_function
|
||||
|
||||
static future<>
|
||||
run_fuzzy_test_workload(fuzzy_test_config cfg, distributed<replica::database>& db, schema_ptr schema,
|
||||
const std::vector<test::partition_description>& part_descs) {
|
||||
return run_concurrently(cfg.scans, cfg.concurrency, [cfg, &db, schema = std::move(schema), &part_descs] (size_t i) {
|
||||
return seastar::async([i, cfg, &db, schema, &part_descs] () mutable {
|
||||
run_fuzzy_test_scan(i, cfg, db, std::move(schema), part_descs);
|
||||
const std::vector<frozen_mutation>& frozen_mutations) {
|
||||
return run_concurrently(cfg.scans, cfg.concurrency, [cfg, &db, schema = std::move(schema), &frozen_mutations] (size_t i) {
|
||||
return seastar::async([i, cfg, &db, schema, &frozen_mutations] () mutable {
|
||||
run_fuzzy_test_scan(i, cfg, db, std::move(schema), frozen_mutations);
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -1285,24 +1084,35 @@ SEASTAR_THREAD_TEST_CASE(fuzzy_test) {
|
||||
const auto seed = tests::random::get_int<uint32_t>();
|
||||
testlog.info("fuzzy test seed: {}", seed);
|
||||
|
||||
auto rnd_engine = std::mt19937(seed);
|
||||
|
||||
auto pop_desc = create_fuzzy_test_table(env, rnd_engine);
|
||||
auto tbl = create_test_table(env, seed, "ks", get_name(), false,
|
||||
#ifdef DEBUG
|
||||
std::uniform_int_distribution<size_t>(8, 32), // partitions
|
||||
std::uniform_int_distribution<size_t>(0, 100), // clustering-rows
|
||||
std::uniform_int_distribution<size_t>(0, 10), // range-tombstones
|
||||
#elif DEVEL
|
||||
std::uniform_int_distribution<size_t>(16, 64), // partitions
|
||||
std::uniform_int_distribution<size_t>(0, 100), // clustering-rows
|
||||
std::uniform_int_distribution<size_t>(0, 100), // range-tombstones
|
||||
#else
|
||||
std::uniform_int_distribution<size_t>(32, 64), // partitions
|
||||
std::uniform_int_distribution<size_t>(0, 1000), // clustering-rows
|
||||
std::uniform_int_distribution<size_t>(0, 1000), // range-tombstones
|
||||
#endif
|
||||
tests::default_timestamp_generator());
|
||||
|
||||
#if defined DEBUG
|
||||
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{8}, 1, 1};
|
||||
#elif defined DEVEL
|
||||
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{2}, 8, 4};
|
||||
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{2}, 2, 4};
|
||||
#else
|
||||
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{2}, 16, 256};
|
||||
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{2}, 4, 8};
|
||||
#endif
|
||||
|
||||
testlog.info("Running test workload with configuration: seed={}, timeout={}s, concurrency={}, scans={}", cfg.seed, cfg.timeout.count(),
|
||||
cfg.concurrency, cfg.scans);
|
||||
|
||||
const auto& partitions = pop_desc.partitions;
|
||||
smp::invoke_on_all([cfg, db = &env.db(), gs = global_schema_ptr(pop_desc.schema), &partitions] {
|
||||
return run_fuzzy_test_workload(cfg, *db, gs.get(), partitions);
|
||||
smp::invoke_on_all([cfg, db = &env.db(), gs = global_schema_ptr(tbl.schema), &compacted_frozen_mutations = tbl.compacted_frozen_mutations] {
|
||||
return run_fuzzy_test_workload(cfg, *db, gs.get(), compacted_frozen_mutations);
|
||||
}).handle_exception([seed] (std::exception_ptr e) {
|
||||
testlog.error("Test workload failed with exception {}."
|
||||
" To repeat this particular run, replace the random seed of the test, with that of this run ({})."
|
||||
|
||||
@@ -37,5 +37,7 @@ custom_args:
|
||||
- '-c2 -m2G --fail-on-abandoned-failed-futures=true'
|
||||
reader_concurrency_semaphore_test:
|
||||
- '-c1 -m256M'
|
||||
multishard_mutation_query_test:
|
||||
- '-c2 -m3G'
|
||||
run_in_debug:
|
||||
- logalloc_standard_allocator_segment_pool_backend_test
|
||||
|
||||
@@ -1,468 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2018-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#include "test/lib/test_table.hh"
|
||||
#include "test/lib/log.hh"
|
||||
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
|
||||
#include "test/lib/cql_assertions.hh"
|
||||
|
||||
namespace test {
|
||||
|
||||
std::vector<clustering_key> slice_keys(const schema& schema, const std::vector<clustering_key>& keys, const query::clustering_row_ranges& ranges) {
|
||||
if (keys.empty() || ranges.empty()) {
|
||||
return {};
|
||||
}
|
||||
|
||||
const auto tri_cmp = clustering_key::tri_compare(schema);
|
||||
std::vector<clustering_key> sliced_keys;
|
||||
|
||||
auto keys_it = keys.begin();
|
||||
const auto keys_end = keys.end();
|
||||
auto ranges_it = ranges.begin();
|
||||
const auto ranges_end = ranges.end();
|
||||
|
||||
while (keys_it != keys_end && ranges_it != ranges_end) {
|
||||
if (ranges_it->before(*keys_it, tri_cmp)) {
|
||||
++keys_it; // Key is before current range: skip.
|
||||
continue;
|
||||
}
|
||||
if (ranges_it->contains(*keys_it, tri_cmp)) {
|
||||
sliced_keys.push_back(*keys_it++);
|
||||
} else {
|
||||
// Key is after current range: go to next range.
|
||||
++ranges_it;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
return sliced_keys;
|
||||
}
|
||||
|
||||
std::pair<schema_ptr, std::vector<dht::decorated_key>> create_test_table(cql_test_env& env, const sstring& ks_name, const sstring& table_name,
|
||||
int partition_count, int row_per_partition_count) {
|
||||
class simple_population_generator : public population_generator {
|
||||
class simple_partition_content_generator : public partition_content_generator {
|
||||
const int _pk;
|
||||
int _ck = 0;
|
||||
const int _row_count;
|
||||
|
||||
public:
|
||||
explicit simple_partition_content_generator(int pk, int row_count) : _pk(pk), _row_count(row_count) {
|
||||
}
|
||||
virtual partition_key generate_partition_key(const schema& schema) override {
|
||||
return partition_key::from_single_value(schema, serialized(_pk));
|
||||
}
|
||||
virtual bool has_static_row() override {
|
||||
return false;
|
||||
}
|
||||
virtual bytes generate_static_row(const schema&, const partition_key&) override {
|
||||
return {};
|
||||
}
|
||||
virtual int clustering_row_count() override {
|
||||
return _row_count;
|
||||
}
|
||||
virtual row generate_row(const schema& schema, const partition_key&) override {
|
||||
const auto ck = _ck++;
|
||||
const auto data = _pk ^ ck;
|
||||
auto buf = bytes(bytes::initialized_later{}, sizeof(int));
|
||||
std::copy_n(&data, 1, reinterpret_cast<int*>(buf.data()));
|
||||
return row{clustering_key::from_single_value(schema, serialized(ck)), std::move(buf)};
|
||||
}
|
||||
virtual query::clustering_row_ranges generate_delete_ranges(const schema&, const std::vector<clustering_key>&) override {
|
||||
return {};
|
||||
}
|
||||
};
|
||||
const int _partition_count;
|
||||
const int _row_count;
|
||||
int _pk = 0;
|
||||
public:
|
||||
explicit simple_population_generator(int partition_count, int row_count) : _partition_count(partition_count), _row_count(row_count) {
|
||||
}
|
||||
virtual size_t partition_count() override {
|
||||
return _partition_count;
|
||||
}
|
||||
virtual std::unique_ptr<partition_content_generator> make_partition_content_generator() override {
|
||||
return std::make_unique<simple_partition_content_generator>(_pk++, _row_count);
|
||||
}
|
||||
};
|
||||
auto pop_desc = create_test_table(env, ks_name, table_name,
|
||||
std::make_unique<simple_population_generator>(partition_count, row_per_partition_count));
|
||||
std::vector<dht::decorated_key> pkeys;
|
||||
pkeys.reserve(pop_desc.partitions.size());
|
||||
for (auto& part : pop_desc.partitions) {
|
||||
pkeys.emplace_back(std::move(part.dkey));
|
||||
}
|
||||
return std::pair(std::move(pop_desc.schema), std::move(pkeys));
|
||||
}
|
||||
|
||||
population_description create_test_table(cql_test_env& env, const sstring& ks_name, const sstring& table_name, uint32_t seed,
|
||||
std::vector<partition_configuration> part_configs, generate_blob_function gen_blob) {
|
||||
class configurable_random_partition_content_generator : public test::partition_content_generator {
|
||||
std::mt19937& _engine;
|
||||
partition_configuration& _config;
|
||||
generate_blob_function& _gen_blob;
|
||||
std::uniform_int_distribution<int> _key_dist;
|
||||
|
||||
public:
|
||||
configurable_random_partition_content_generator(std::mt19937& engine, partition_configuration& config, generate_blob_function& gen_blob)
|
||||
: _engine(engine)
|
||||
, _config(config)
|
||||
, _gen_blob(gen_blob)
|
||||
, _key_dist(std::numeric_limits<int>::min(), std::numeric_limits<int>::max()) {
|
||||
}
|
||||
virtual partition_key generate_partition_key(const schema& schema) override {
|
||||
return partition_key::from_single_value(schema, serialized(_key_dist(_engine)));
|
||||
}
|
||||
virtual bool has_static_row() override {
|
||||
return _config.static_row_size_dist.has_value();
|
||||
}
|
||||
virtual bytes generate_static_row(const schema& schema, const partition_key& pk) override {
|
||||
return _gen_blob(schema, (*_config.static_row_size_dist)(_engine), pk, nullptr);
|
||||
}
|
||||
virtual int clustering_row_count() override {
|
||||
return _config.clustering_row_count_dist(_engine);
|
||||
}
|
||||
virtual row generate_row(const schema& schema, const partition_key& pk) override {
|
||||
auto ck = clustering_key::from_single_value(schema, serialized(_key_dist(_engine)));
|
||||
auto value = _gen_blob(schema, _config.clustering_row_size_dist(_engine), pk, &ck);
|
||||
return row{std::move(ck), std::move(value)};
|
||||
}
|
||||
virtual query::clustering_row_ranges generate_delete_ranges(const schema& schema, const std::vector<clustering_key>& rows) override {
|
||||
const auto count = _config.range_deletion_count_dist(_engine);
|
||||
if (!count) {
|
||||
return {};
|
||||
}
|
||||
|
||||
std::uniform_int_distribution<int> index_dist(0, rows.size() - 1);
|
||||
std::uniform_int_distribution<int> inclusive_dist(0, 1);
|
||||
|
||||
using range_bound = query::clustering_range::bound;
|
||||
|
||||
query::clustering_row_ranges ranges;
|
||||
for (auto i = 0; i < count - 1; ++i) {
|
||||
const auto size = _config.range_deletion_size_dist(_engine);
|
||||
if (size == 0) {
|
||||
continue;
|
||||
}
|
||||
if (size >= int(rows.size())) {
|
||||
ranges.emplace_back(range_bound(rows.front(), true), range_bound(rows.back(), true));
|
||||
continue;
|
||||
}
|
||||
|
||||
const bool b1_inclusive = inclusive_dist(_engine);
|
||||
const bool b2_inclusive = size_t(size) == rows.size() - 1 ? true : inclusive_dist(_engine);
|
||||
|
||||
auto param = std::uniform_int_distribution<int>::param_type(0, rows.size() - size - !b1_inclusive - !b2_inclusive);
|
||||
const auto b1 = index_dist(_engine, param);
|
||||
|
||||
const auto b2 = b1 + size - 1 + !b1_inclusive + !b2_inclusive;
|
||||
ranges.emplace_back(range_bound(rows.at(b1), b1_inclusive), range_bound(rows.at(b2), b2_inclusive));
|
||||
}
|
||||
return ranges;
|
||||
}
|
||||
};
|
||||
|
||||
class configurable_random_population_generator : public test::population_generator {
|
||||
std::mt19937 _engine;
|
||||
std::vector<partition_configuration> _part_configs;
|
||||
size_t _count;
|
||||
generate_blob_function _gen_blob;
|
||||
|
||||
public:
|
||||
configurable_random_population_generator(uint32_t seed, std::vector<partition_configuration> part_configs, generate_blob_function gen_blob)
|
||||
: _engine(seed)
|
||||
, _part_configs(std::move(part_configs))
|
||||
, _count(boost::accumulate(_part_configs, size_t(0), [] (size_t c, const partition_configuration& part_config) {
|
||||
return c + part_config.count;
|
||||
}))
|
||||
, _gen_blob(std::move(gen_blob)) {
|
||||
}
|
||||
virtual size_t partition_count() override {
|
||||
return _count;
|
||||
}
|
||||
virtual std::unique_ptr<partition_content_generator> make_partition_content_generator() override {
|
||||
const auto index = std::uniform_int_distribution<int>(0, _part_configs.size() - 1)(_engine);
|
||||
auto& partition_config = _part_configs.at(index);
|
||||
auto gen = std::make_unique<configurable_random_partition_content_generator>(_engine, partition_config, _gen_blob);
|
||||
if (!--partition_config.count) {
|
||||
std::swap(partition_config, _part_configs.back());
|
||||
_part_configs.pop_back();
|
||||
}
|
||||
return gen;
|
||||
}
|
||||
};
|
||||
return create_test_table(env, ks_name, table_name, std::make_unique<configurable_random_population_generator>(seed, std::move(part_configs),
|
||||
std::move(gen_blob)));
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
static size_t maybe_generate_static_row(cql_test_env& env, const schema& schema,partition_content_generator& part_gen,
|
||||
const cql3::prepared_cache_key_type& static_insert_id, const partition_description& part_desc) {
|
||||
if (!part_gen.has_static_row()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
auto value = part_gen.generate_static_row(schema, part_desc.dkey.key());
|
||||
const auto size = value.size();
|
||||
|
||||
env.execute_prepared(static_insert_id, {{
|
||||
cql3::raw_value::make_value(to_bytes(part_desc.dkey.key().get_component(schema, 0))),
|
||||
cql3::raw_value::make_value(serialized(std::move(value)))}}).get();
|
||||
return size;
|
||||
}
|
||||
|
||||
struct clustering_row_generation_result {
|
||||
size_t written_bytes = 0;
|
||||
std::vector<clustering_key> live_rows;
|
||||
std::vector<clustering_key> dead_rows;
|
||||
query::clustering_row_ranges range_tombstones;
|
||||
};
|
||||
|
||||
class clustering_range_less_compare {
|
||||
clustering_key::tri_compare _tri_cmp;
|
||||
public:
|
||||
explicit clustering_range_less_compare(const schema& schema) : _tri_cmp(schema) {
|
||||
}
|
||||
bool operator()(const query::clustering_range& a, const query::clustering_range& b) const {
|
||||
if (!a.start() || !b.start()) {
|
||||
return !a.start();
|
||||
}
|
||||
if (auto res = _tri_cmp(a.start()->value(), b.start()->value()); res != 0) {
|
||||
return res < 0;
|
||||
}
|
||||
return a.start()->is_inclusive() < b.start()->is_inclusive();
|
||||
}
|
||||
};
|
||||
|
||||
static clustering_row_generation_result generate_clustering_rows(
|
||||
cql_test_env& env,
|
||||
const schema& schema,
|
||||
partition_content_generator& part_gen,
|
||||
const cql3::prepared_cache_key_type& clustering_insert_id,
|
||||
const std::array<std::array<cql3::prepared_cache_key_type, 2>, 2>& clustering_delete_id_mappings,
|
||||
const partition_description& part_desc) {
|
||||
// If the partition has no static row and no clustering rows it will not exist.
|
||||
// Make sure each partition has at least one row.
|
||||
const auto clustering_row_count = std::max(int(!part_desc.has_static_row), part_gen.clustering_row_count());
|
||||
if (!clustering_row_count) {
|
||||
return clustering_row_generation_result{};
|
||||
}
|
||||
|
||||
size_t written_bytes = 0;
|
||||
|
||||
// The generator is allowed to produce duplicates.
|
||||
// Use a set to de-duplicate rows (and while at it keep them sorted).
|
||||
std::set<clustering_key, clustering_key::less_compare> written_rows{clustering_key::less_compare(schema)};
|
||||
|
||||
for (auto j = 0; j < clustering_row_count; ++j) {
|
||||
auto [key, value] = part_gen.generate_row(schema, part_desc.dkey.key());
|
||||
|
||||
written_bytes += key.external_memory_usage();
|
||||
written_bytes += value.size();
|
||||
|
||||
env.execute_prepared(clustering_insert_id, {{
|
||||
cql3::raw_value::make_value(to_bytes(part_desc.dkey.key().get_component(schema, 0))),
|
||||
cql3::raw_value::make_value(to_bytes(key.get_component(schema, 0))),
|
||||
cql3::raw_value::make_value(serialized(std::move(value)))}}).get();
|
||||
written_rows.insert(std::move(key));
|
||||
}
|
||||
|
||||
std::vector<clustering_key> rows;
|
||||
rows.reserve(written_rows.size());
|
||||
std::move(written_rows.begin(), written_rows.end(), std::back_inserter(rows));
|
||||
|
||||
auto delete_ranges = part_gen.generate_delete_ranges(schema, rows);
|
||||
for (const auto& range : delete_ranges) {
|
||||
const auto delete_id = clustering_delete_id_mappings[range.start()->is_inclusive()][range.end()->is_inclusive()];
|
||||
env.execute_prepared(delete_id, {{
|
||||
cql3::raw_value::make_value(to_bytes(part_desc.dkey.key().get_component(schema, 0))),
|
||||
cql3::raw_value::make_value(to_bytes(range.start()->value().get_component(schema, 0))),
|
||||
cql3::raw_value::make_value(to_bytes(range.end()->value().get_component(schema, 0)))}}).get();
|
||||
}
|
||||
|
||||
std::sort(delete_ranges.begin(), delete_ranges.end(), clustering_range_less_compare(schema));
|
||||
|
||||
const auto tri_cmp = clustering_key::tri_compare(schema);
|
||||
|
||||
const auto deleted_ranges_deoverlapped = query::clustering_range::deoverlap(delete_ranges, tri_cmp);
|
||||
|
||||
std::vector<clustering_key> live_rows, dead_rows;
|
||||
auto ranges_it = deleted_ranges_deoverlapped.cbegin();
|
||||
const auto ranges_end = deleted_ranges_deoverlapped.cend();
|
||||
|
||||
for (auto& row : rows) {
|
||||
while (ranges_it != ranges_end && ranges_it->after(row, tri_cmp)) {
|
||||
++ranges_it;
|
||||
}
|
||||
if (ranges_it == ranges_end) {
|
||||
live_rows.push_back(std::move(row));
|
||||
continue;
|
||||
}
|
||||
if (ranges_it->before(row, tri_cmp)) {
|
||||
live_rows.push_back(std::move(row));
|
||||
continue;
|
||||
}
|
||||
if (ranges_it->contains(row, tri_cmp)) {
|
||||
dead_rows.push_back(std::move(row));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
return clustering_row_generation_result{written_bytes, std::move(live_rows), std::move(dead_rows), std::move(delete_ranges)};
|
||||
}
|
||||
|
||||
static std::vector<clustering_key> merge_and_deduplicate_rows(const schema& schema, const std::vector<clustering_key>& a,
|
||||
const std::vector<clustering_key>& b) {
|
||||
std::vector<clustering_key> merged_rows;
|
||||
merged_rows.reserve(a.size() + b.size());
|
||||
std::merge(a.begin(), a.end(), b.begin(), b.end(), std::back_inserter(merged_rows), clustering_key::less_compare(schema));
|
||||
merged_rows.erase(std::unique(merged_rows.begin(), merged_rows.end(), clustering_key::equality(schema)), merged_rows.end());
|
||||
return merged_rows;
|
||||
}
|
||||
|
||||
static query::clustering_row_ranges merge_ranges(const schema& schema, const query::clustering_row_ranges& a, const query::clustering_row_ranges& b) {
|
||||
query::clustering_row_ranges merged_ranges;
|
||||
merged_ranges.reserve(a.size() + b.size());
|
||||
std::merge(a.begin(), a.end(), b.begin(), b.end(), std::back_inserter(merged_ranges), clustering_range_less_compare(schema));
|
||||
return merged_ranges;
|
||||
}
|
||||
|
||||
struct partition_generation_result {
|
||||
size_t written_partition_count = 0;
|
||||
size_t written_row_count = 0;
|
||||
size_t written_rt_count = 0;
|
||||
size_t written_bytes = 0;
|
||||
std::vector<partition_description> partitions;
|
||||
};
|
||||
|
||||
static partition_generation_result generate_partitions(
|
||||
cql_test_env& env,
|
||||
const schema& schema,
|
||||
population_generator& pop_gen,
|
||||
const cql3::prepared_cache_key_type& static_insert_id,
|
||||
const cql3::prepared_cache_key_type& clustering_insert_id,
|
||||
const std::array<std::array<cql3::prepared_cache_key_type, 2>, 2>& clustering_delete_id_mappings) {
|
||||
|
||||
size_t written_row_count = 0;
|
||||
size_t written_rt_count = 0;
|
||||
size_t written_bytes = 0;
|
||||
|
||||
// The generator is allowed to produce duplicates.
|
||||
// Use a map to deduplicate partitions (and while at it keep them sorted).
|
||||
std::map<dht::decorated_key, partition_description, dht::decorated_key::less_comparator> partitions(
|
||||
dht::decorated_key::less_comparator(schema.shared_from_this()));
|
||||
|
||||
const auto part_count = pop_gen.partition_count();
|
||||
for (size_t i = 0; i < part_count; ++i) {
|
||||
auto part_gen = pop_gen.make_partition_content_generator();
|
||||
partition_description part_desc(dht::decorate_key(schema, part_gen->generate_partition_key(schema)));
|
||||
written_bytes += part_desc.dkey.key().external_memory_usage();
|
||||
|
||||
{
|
||||
auto size = maybe_generate_static_row(env, schema, *part_gen, static_insert_id, part_desc);
|
||||
written_bytes += size;
|
||||
part_desc.has_static_row = size > 0;
|
||||
}
|
||||
{
|
||||
auto res = generate_clustering_rows(env, schema, *part_gen, clustering_insert_id, clustering_delete_id_mappings, part_desc);
|
||||
written_bytes += res.written_bytes;
|
||||
written_row_count += std::max(res.live_rows.size() + res.dead_rows.size(), size_t(part_desc.has_static_row));
|
||||
written_rt_count += res.range_tombstones.size();
|
||||
part_desc.live_rows = std::move(res.live_rows);
|
||||
part_desc.dead_rows = std::move(res.dead_rows);
|
||||
part_desc.range_tombstones = std::move(res.range_tombstones);
|
||||
}
|
||||
|
||||
auto it = partitions.find(part_desc.dkey);
|
||||
if (it == partitions.end()) {
|
||||
auto key = part_desc.dkey;
|
||||
partitions.emplace(std::move(key), std::move(part_desc));
|
||||
} else {
|
||||
it->second.has_static_row |= part_desc.has_static_row;
|
||||
it->second.live_rows = merge_and_deduplicate_rows(schema, part_desc.live_rows, it->second.live_rows);
|
||||
it->second.dead_rows = merge_and_deduplicate_rows(schema, part_desc.dead_rows, it->second.dead_rows);
|
||||
it->second.range_tombstones = merge_ranges(schema, part_desc.range_tombstones, it->second.range_tombstones);
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<partition_description> partition_vec;
|
||||
partition_vec.reserve(partitions.size());
|
||||
auto partition_values = partitions | boost::adaptors::map_values;
|
||||
std::move(partition_values.begin(), partition_values.end(), std::back_inserter(partition_vec));
|
||||
|
||||
return partition_generation_result{part_count, written_row_count, written_rt_count, written_bytes, std::move(partition_vec)};
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
population_description create_test_table(cql_test_env& env, const sstring& ks_name, const sstring& table_name,
|
||||
std::unique_ptr<population_generator> pop_gen) {
|
||||
env.execute_cql(format("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'SimpleStrategy', 'replication_factor' : 1}};", ks_name)).get();
|
||||
env.execute_cql(format("CREATE TABLE {}.{} (pk int, ck int, s blob static, v blob, PRIMARY KEY(pk, ck));", ks_name, table_name)).get();
|
||||
|
||||
const auto static_insert_id = env.prepare(format("INSERT INTO {}.{} (\"pk\", \"s\") VALUES (?, ?);", ks_name, table_name)).get0();
|
||||
const auto clustering_insert_id = env.prepare(format("INSERT INTO {}.{} (\"pk\", \"ck\", \"v\") VALUES (?, ?, ?);", ks_name, table_name)).get0();
|
||||
|
||||
const auto clustering_delete_id_open_open = env.prepare(format("DELETE FROM {}.{} WHERE pk = ? AND ck > ? AND ck < ?;", ks_name,
|
||||
table_name)).get0();
|
||||
const auto clustering_delete_id_open_closed = env.prepare(format("DELETE FROM {}.{} WHERE pk = ? AND ck > ? AND ck <= ?;", ks_name,
|
||||
table_name)).get0();
|
||||
const auto clustering_delete_id_closed_open = env.prepare(format("DELETE FROM {}.{} WHERE pk = ? AND ck >= ? AND ck < ?;", ks_name,
|
||||
table_name)).get0();
|
||||
const auto clustering_delete_id_closed_closed = env.prepare(format("DELETE FROM {}.{} WHERE pk = ? AND ck >= ? AND ck <= ?;", ks_name,
|
||||
table_name)).get0();
|
||||
|
||||
// Indexing with `range_bound::is_inclusive()` will select the correct id to use.
|
||||
const std::array<std::array<cql3::prepared_cache_key_type, 2>, 2> clustering_delete_id_mappings = {{
|
||||
{clustering_delete_id_open_open, clustering_delete_id_open_closed},
|
||||
{clustering_delete_id_closed_open, clustering_delete_id_closed_closed}}};
|
||||
|
||||
population_description pop_desc;
|
||||
|
||||
pop_desc.schema = env.local_db().find_column_family(ks_name, table_name).schema();
|
||||
|
||||
testlog.info("Populating test data...");
|
||||
|
||||
auto res = generate_partitions(env, *pop_desc.schema, *pop_gen, static_insert_id, clustering_insert_id, clustering_delete_id_mappings);
|
||||
pop_desc.partitions = std::move(res.partitions);
|
||||
|
||||
uint64_t live_row_count = 0;
|
||||
uint64_t dead_row_count = 0;
|
||||
for (auto& part : pop_desc.partitions) {
|
||||
live_row_count += std::max(part.live_rows.size(), uint64_t(part.has_static_row));
|
||||
dead_row_count += part.dead_rows.size();
|
||||
testlog.trace("Partition {}, has_static_rows={}, rows={}, (of which live={} and dead={})",
|
||||
part.dkey,
|
||||
part.has_static_row,
|
||||
part.live_rows.size() + part.dead_rows.size(),
|
||||
part.live_rows.size(),
|
||||
part.dead_rows.size());
|
||||
}
|
||||
|
||||
// We don't expect this to fail, this is here more to validate our
|
||||
// expectation of the population, then the correctness of writes.
|
||||
auto msg = env.execute_cql(format("SELECT COUNT(*) FROM {}.{}", ks_name, table_name)).get0();
|
||||
assert_that(msg).is_rows().with_rows({{serialized(int64_t(live_row_count))}});
|
||||
|
||||
testlog.info("Done. Population summary: written {} partitions, {} rows, {} range tombstones and {} bytes;"
|
||||
" have (after de-duplication) {} partitions, {} live rows and {} dead rows.",
|
||||
res.written_partition_count,
|
||||
res.written_row_count,
|
||||
res.written_rt_count,
|
||||
res.written_bytes,
|
||||
pop_desc.partitions.size(),
|
||||
live_row_count,
|
||||
dead_row_count);
|
||||
|
||||
return pop_desc;
|
||||
}
|
||||
|
||||
} // namespace test
|
||||
@@ -14,6 +14,7 @@
|
||||
#include "mutation/mutation.hh"
|
||||
#include "mutation/mutation_fragment.hh"
|
||||
#include "schema/schema_builder.hh"
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "test/lib/random_schema.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "types/list.hh"
|
||||
@@ -103,7 +104,17 @@ type_generator::type_generator(random_schema_specification& spec) : _spec(spec)
|
||||
|
||||
data_type type_generator::operator()(std::mt19937& engine, is_multi_cell multi_cell) {
|
||||
auto dist = std::uniform_int_distribution<size_t>(0, _generators.size() - 1);
|
||||
return _generators.at(dist(engine))(engine, multi_cell);
|
||||
auto type = _generators.at(dist(engine))(engine, multi_cell);
|
||||
// duration type is not allowed in:
|
||||
// * primary key components
|
||||
// * as member types of collections
|
||||
//
|
||||
// To cover all this, we simply disallow it altogether when multi_cell is
|
||||
// no, which will be the case in all the above cases.
|
||||
while (!multi_cell && type == duration_type) {
|
||||
type = (*this)(engine, multi_cell);
|
||||
}
|
||||
return type;
|
||||
}
|
||||
|
||||
namespace {
|
||||
@@ -133,12 +144,13 @@ private:
|
||||
std::vector<data_type> generate_types(std::mt19937& engine, std::uniform_int_distribution<size_t>& count_dist,
|
||||
type_generator::is_multi_cell multi_cell, bool allow_reversed = false) {
|
||||
std::uniform_int_distribution<uint8_t> reversed_dist{0, uint8_t(allow_reversed)};
|
||||
std::uniform_int_distribution<uint8_t> multi_cell_dist{0, uint8_t(bool(multi_cell))};
|
||||
|
||||
std::vector<data_type> types;
|
||||
|
||||
const auto count = count_dist(engine);
|
||||
for (size_t c = 0; c < count; ++c) {
|
||||
auto type = _type_generator(engine, multi_cell);
|
||||
auto type = _type_generator(engine, type_generator::is_multi_cell(bool(multi_cell_dist(engine))));
|
||||
if (reversed_dist(engine)) {
|
||||
types.emplace_back(make_shared<reversed_type_impl>(std::move(type)));
|
||||
} else {
|
||||
@@ -169,7 +181,7 @@ public:
|
||||
return format("table{}", generate_unique_id(engine, _used_table_ids));
|
||||
}
|
||||
virtual sstring udt_name(std::mt19937& engine) override {
|
||||
return format("UDT{}", generate_unique_id(engine, _used_udt_ids));
|
||||
return format("udt{}", generate_unique_id(engine, _used_udt_ids));
|
||||
}
|
||||
virtual std::vector<data_type> partition_key_columns(std::mt19937& engine) override {
|
||||
return generate_types(engine, _partition_column_count_dist, type_generator::is_multi_cell::no, false);
|
||||
@@ -760,13 +772,13 @@ schema_ptr build_random_schema(uint32_t seed, random_schema_specification& spec)
|
||||
builder.with_column(to_bytes(format("pk{}", pk)), std::move(pk_columns[pk]), column_kind::partition_key);
|
||||
}
|
||||
|
||||
if (const auto ck_columns = spec.clustering_key_columns(engine); !ck_columns.empty()) {
|
||||
for (size_t ck = 0; ck < ck_columns.size(); ++ck) {
|
||||
builder.with_column(to_bytes(format("ck{}", ck)), std::move(ck_columns[ck]), column_kind::clustering_key);
|
||||
}
|
||||
const auto ck_columns = spec.clustering_key_columns(engine);
|
||||
for (size_t ck = 0; ck < ck_columns.size(); ++ck) {
|
||||
builder.with_column(to_bytes(format("ck{}", ck)), std::move(ck_columns[ck]), column_kind::clustering_key);
|
||||
}
|
||||
|
||||
if (const auto static_columns = spec.static_columns(engine); !static_columns.empty()) {
|
||||
if (!ck_columns.empty()) {
|
||||
const auto static_columns = spec.static_columns(engine);
|
||||
for (size_t s = 0; s < static_columns.size(); ++s) {
|
||||
builder.with_column(to_bytes(format("s{}", s)), std::move(static_columns[s]), column_kind::static_column);
|
||||
}
|
||||
@@ -782,48 +794,52 @@ schema_ptr build_random_schema(uint32_t seed, random_schema_specification& spec)
|
||||
}
|
||||
|
||||
sstring udt_to_str(const user_type_impl& udt) {
|
||||
std::vector<sstring> fields;
|
||||
for (size_t i = 0; i < udt.field_types().size(); ++i) {
|
||||
fields.emplace_back(format("{} {}", udt.field_name_as_string(i), udt.field_type(i)->as_cql3_type().to_string()));
|
||||
}
|
||||
return format("CREATE TYPE {} (\n\t{})",
|
||||
udt.get_name_as_string(),
|
||||
boost::algorithm::join(fields, ",\n\t"));
|
||||
std::stringstream ss;
|
||||
udt.describe(ss);
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
// Single element overload, for convenience.
|
||||
std::unordered_set<const user_type_impl*> dump_udts(data_type type) {
|
||||
if (auto maybe_user_type = dynamic_cast<const user_type_impl*>(type.get())) {
|
||||
return {maybe_user_type};
|
||||
}
|
||||
return {};
|
||||
}
|
||||
struct udt_list {
|
||||
std::vector<const user_type_impl*> vector;
|
||||
|
||||
std::unordered_set<const user_type_impl*> dump_udts(const std::vector<data_type>& types) {
|
||||
std::unordered_set<const user_type_impl*> udts;
|
||||
void insert(const user_type_impl* udt) {
|
||||
auto it = std::find(vector.begin(), vector.end(), udt);
|
||||
if (it == vector.end()) {
|
||||
vector.push_back(udt);
|
||||
}
|
||||
}
|
||||
|
||||
void merge(udt_list other) {
|
||||
for (auto& udt : other.vector) {
|
||||
insert(udt);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
udt_list dump_udts(const std::vector<data_type>& types) {
|
||||
udt_list udts;
|
||||
for (const auto& dt : types) {
|
||||
const auto* const type = dt.get();
|
||||
if (auto maybe_user_type = dynamic_cast<const user_type_impl*>(type)) {
|
||||
udts.insert(maybe_user_type);
|
||||
udts.merge(dump_udts(maybe_user_type->field_types()));
|
||||
udts.insert(maybe_user_type);
|
||||
} else if (auto maybe_tuple_type = dynamic_cast<const tuple_type_impl*>(type)) {
|
||||
udts.merge(dump_udts(maybe_tuple_type->all_types()));
|
||||
} else if (auto maybe_list_type = dynamic_cast<const list_type_impl*>(type)) {
|
||||
udts.merge(dump_udts(maybe_list_type->get_elements_type()));
|
||||
udts.merge(dump_udts({maybe_list_type->get_elements_type()}));
|
||||
} else if (auto maybe_set_type = dynamic_cast<const set_type_impl*>(type)) {
|
||||
udts.merge(dump_udts(maybe_set_type->get_elements_type()));
|
||||
udts.merge(dump_udts({maybe_set_type->get_elements_type()}));
|
||||
} else if (auto maybe_map_type = dynamic_cast<const map_type_impl*>(type)) {
|
||||
udts.merge(dump_udts(maybe_map_type->get_keys_type()));
|
||||
udts.merge(dump_udts(maybe_map_type->get_values_type()));
|
||||
udts.merge(dump_udts({maybe_map_type->get_keys_type(), maybe_map_type->get_values_type()}));
|
||||
} else if (auto maybe_reversed_type = dynamic_cast<const reversed_type_impl*>(type)) {
|
||||
udts.merge(dump_udts(maybe_reversed_type->underlying_type()));
|
||||
udts.merge(dump_udts({maybe_reversed_type->underlying_type()}));
|
||||
}
|
||||
}
|
||||
return udts;
|
||||
}
|
||||
|
||||
std::vector<sstring> dump_udts(const schema& schema) {
|
||||
std::unordered_set<const user_type_impl*> udts;
|
||||
std::vector<const user_type_impl*> dump_udts(const schema& schema) {
|
||||
udt_list udts;
|
||||
|
||||
const auto cdefs_to_types = [] (const schema::const_iterator_range_type& cdefs) -> std::vector<data_type> {
|
||||
return boost::copy_range<std::vector<data_type>>(cdefs |
|
||||
@@ -835,8 +851,7 @@ std::vector<sstring> dump_udts(const schema& schema) {
|
||||
udts.merge(dump_udts(cdefs_to_types(schema.regular_columns())));
|
||||
udts.merge(dump_udts(cdefs_to_types(schema.static_columns())));
|
||||
|
||||
return boost::copy_range<std::vector<sstring>>(udts |
|
||||
boost::adaptors::transformed([] (const user_type_impl* const udt) { return udt_to_str(*udt); }));
|
||||
return udts.vector;
|
||||
}
|
||||
|
||||
std::vector<sstring> columns_specs(schema_ptr schema, column_kind kind) {
|
||||
@@ -936,7 +951,8 @@ sstring random_schema::cql() const {
|
||||
|
||||
sstring udts_str;
|
||||
if (!udts.empty()) {
|
||||
udts_str = boost::algorithm::join(udts, "\n");
|
||||
udts_str = boost::algorithm::join(udts |
|
||||
boost::adaptors::transformed([] (const user_type_impl* const udt) { return udt_to_str(*udt); }), "\n");
|
||||
}
|
||||
|
||||
std::vector<sstring> col_specs;
|
||||
@@ -1064,6 +1080,89 @@ void random_schema::delete_range(
|
||||
md.add_range_tombstone(std::move(range), tombstone{ts_gen(engine, timestamp_destination::range_tombstone, api::min_timestamp), deletion_time});
|
||||
}
|
||||
|
||||
future<> random_schema::create_with_cql(cql_test_env& env) {
|
||||
return async([this, &env] {
|
||||
const auto ks_name = _schema->ks_name();
|
||||
const auto tbl_name = _schema->cf_name();
|
||||
|
||||
for (const auto& udt : dump_udts(*_schema)) {
|
||||
env.execute_cql(udt_to_str(*udt)).get();
|
||||
eventually_true([&] () mutable {
|
||||
return env.db().map_reduce0([&] (replica::database& db) {
|
||||
return db.user_types().get(ks_name).has_type(udt->get_name());
|
||||
}, true, std::logical_and<bool>{}).get();
|
||||
});
|
||||
}
|
||||
|
||||
auto& db = env.local_db();
|
||||
|
||||
std::stringstream ss;
|
||||
_schema->describe(db, ss, false);
|
||||
|
||||
env.execute_cql(ss.str()).get();
|
||||
|
||||
env.require_table_exists(ks_name, tbl_name).get();
|
||||
auto& tbl = db.find_column_family(ks_name, tbl_name);
|
||||
|
||||
_schema = tbl.schema();
|
||||
});
|
||||
}
|
||||
|
||||
future<std::vector<mutation>> generate_random_mutations(
|
||||
uint32_t seed,
|
||||
tests::random_schema& random_schema,
|
||||
timestamp_generator ts_gen,
|
||||
expiry_generator exp_gen,
|
||||
std::uniform_int_distribution<size_t> partition_count_dist,
|
||||
std::uniform_int_distribution<size_t> clustering_row_count_dist,
|
||||
std::uniform_int_distribution<size_t> range_tombstone_count_dist) {
|
||||
auto engine = std::mt19937(seed);
|
||||
const auto schema_has_clustering_columns = random_schema.schema()->clustering_key_size() > 0;
|
||||
const auto partition_count = partition_count_dist(engine);
|
||||
std::vector<mutation> muts;
|
||||
muts.reserve(partition_count);
|
||||
for (size_t pk = 0; pk != partition_count; ++pk) {
|
||||
auto mut = random_schema.new_mutation(pk);
|
||||
random_schema.set_partition_tombstone(engine, mut, ts_gen, exp_gen);
|
||||
random_schema.add_static_row(engine, mut, ts_gen, exp_gen);
|
||||
|
||||
if (!schema_has_clustering_columns) {
|
||||
muts.emplace_back(mut.build(random_schema.schema()));
|
||||
continue;
|
||||
}
|
||||
|
||||
const auto clustering_row_count = clustering_row_count_dist(engine);
|
||||
const auto range_tombstone_count = range_tombstone_count_dist(engine);
|
||||
auto ckeys = random_schema.make_ckeys(std::max(clustering_row_count, range_tombstone_count));
|
||||
|
||||
for (uint32_t ck = 0; ck < ckeys.size(); ++ck) {
|
||||
random_schema.add_row(engine, mut, ckeys[ck], ts_gen, exp_gen);
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < range_tombstone_count; ++i) {
|
||||
const auto a = tests::random::get_int<size_t>(0, ckeys.size() - 1, engine);
|
||||
const auto b = tests::random::get_int<size_t>(0, ckeys.size() - 1, engine);
|
||||
random_schema.delete_range(
|
||||
engine,
|
||||
mut,
|
||||
nonwrapping_range<tests::data_model::mutation_description::key>::make(ckeys.at(std::min(a, b)), ckeys.at(std::max(a, b))),
|
||||
ts_gen,
|
||||
exp_gen);
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
muts.emplace_back(mut.build(random_schema.schema()));
|
||||
}
|
||||
boost::sort(muts, [s = random_schema.schema()] (const mutation& a, const mutation& b) {
|
||||
return a.decorated_key().less_compare(*s, b.decorated_key());
|
||||
});
|
||||
auto range = boost::unique(muts, [s = random_schema.schema()] (const mutation& a, const mutation& b) {
|
||||
return a.decorated_key().equal(*s, b.decorated_key());
|
||||
});
|
||||
muts.erase(range.end(), muts.end());
|
||||
co_return std::move(muts);
|
||||
}
|
||||
|
||||
future<std::vector<mutation>> generate_random_mutations(
|
||||
tests::random_schema& random_schema,
|
||||
timestamp_generator ts_gen,
|
||||
@@ -1071,52 +1170,8 @@ future<std::vector<mutation>> generate_random_mutations(
|
||||
std::uniform_int_distribution<size_t> partition_count_dist,
|
||||
std::uniform_int_distribution<size_t> clustering_row_count_dist,
|
||||
std::uniform_int_distribution<size_t> range_tombstone_count_dist) {
|
||||
auto engine = std::mt19937(tests::random::get_int<uint32_t>());
|
||||
const auto schema_has_clustering_columns = random_schema.schema()->clustering_key_size() > 0;
|
||||
const auto partition_count = partition_count_dist(engine);
|
||||
std::vector<mutation> muts;
|
||||
muts.reserve(partition_count);
|
||||
return do_with(std::move(engine), std::move(muts), [=, &random_schema] (std::mt19937& engine,
|
||||
std::vector<mutation>& muts) mutable {
|
||||
auto r = boost::irange(size_t{0}, partition_count);
|
||||
return do_for_each(r.begin(), r.end(), [=, &random_schema, &engine, &muts] (size_t pk) mutable {
|
||||
auto mut = random_schema.new_mutation(pk);
|
||||
random_schema.set_partition_tombstone(engine, mut, ts_gen, exp_gen);
|
||||
random_schema.add_static_row(engine, mut, ts_gen, exp_gen);
|
||||
|
||||
if (!schema_has_clustering_columns) {
|
||||
muts.emplace_back(mut.build(random_schema.schema()));
|
||||
return;
|
||||
}
|
||||
|
||||
auto ckeys = random_schema.make_ckeys(clustering_row_count_dist(engine));
|
||||
const auto clustering_row_count = ckeys.size();
|
||||
for (uint32_t ck = 0; ck < clustering_row_count; ++ck) {
|
||||
random_schema.add_row(engine, mut, ckeys[ck], ts_gen, exp_gen);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < 4; ++i) {
|
||||
const auto a = tests::random::get_int<size_t>(0, ckeys.size() - 1, engine);
|
||||
const auto b = tests::random::get_int<size_t>(0, ckeys.size() - 1, engine);
|
||||
random_schema.delete_range(
|
||||
engine,
|
||||
mut,
|
||||
nonwrapping_range<tests::data_model::mutation_description::key>::make(ckeys.at(std::min(a, b)), ckeys.at(std::max(a, b))),
|
||||
ts_gen,
|
||||
exp_gen);
|
||||
}
|
||||
muts.emplace_back(mut.build(random_schema.schema()));
|
||||
}).then([&random_schema, &muts] () mutable {
|
||||
boost::sort(muts, [s = random_schema.schema()] (const mutation& a, const mutation& b) {
|
||||
return a.decorated_key().less_compare(*s, b.decorated_key());
|
||||
});
|
||||
auto range = boost::unique(muts, [s = random_schema.schema()] (const mutation& a, const mutation& b) {
|
||||
return a.decorated_key().equal(*s, b.decorated_key());
|
||||
});
|
||||
muts.erase(range.end(), muts.end());
|
||||
return std::move(muts);
|
||||
});
|
||||
});
|
||||
return generate_random_mutations(tests::random::get_int<uint32_t>(), random_schema, std::move(ts_gen), std::move(exp_gen), partition_count_dist,
|
||||
clustering_row_count_dist, range_tombstone_count_dist);
|
||||
}
|
||||
|
||||
future<std::vector<mutation>> generate_random_mutations(tests::random_schema& random_schema, size_t partition_count) {
|
||||
|
||||
@@ -16,6 +16,8 @@
|
||||
/// Random schema and random data generation related utilities.
|
||||
///
|
||||
|
||||
class cql_test_env;
|
||||
|
||||
namespace tests {
|
||||
|
||||
class random_schema_specification {
|
||||
@@ -173,6 +175,13 @@ public:
|
||||
|
||||
sstring cql() const;
|
||||
|
||||
/// Create the generated schema as a table via CQL.
|
||||
///
|
||||
/// Along with all its dependencies, like UDTs.
|
||||
/// The underlying schema_ptr instance is replaced with the one from the
|
||||
/// local table instance.
|
||||
future<> create_with_cql(cql_test_env& env);
|
||||
|
||||
/// Make a partition key which is n-th in some arbitrary sequence of keys.
|
||||
///
|
||||
/// There is no particular order for the keys, they're not in ring order.
|
||||
@@ -236,7 +245,17 @@ public:
|
||||
/// `clustering_row_count_dist` and `range_tombstone_count_dist` will be used to
|
||||
/// generate the respective counts for *each* partition. These params are
|
||||
/// ignored if the schema has no clustering columns.
|
||||
/// Mutations are returned in ring order. Does not contain duplicate partitions.
|
||||
/// Futurized to avoid stalls.
|
||||
future<std::vector<mutation>> generate_random_mutations(
|
||||
uint32_t seed,
|
||||
tests::random_schema& random_schema,
|
||||
timestamp_generator ts_gen = default_timestamp_generator(),
|
||||
expiry_generator exp_gen = no_expiry_expiry_generator(),
|
||||
std::uniform_int_distribution<size_t> partition_count_dist = std::uniform_int_distribution<size_t>(8, 16),
|
||||
std::uniform_int_distribution<size_t> clustering_row_count_dist = std::uniform_int_distribution<size_t>(16, 128),
|
||||
std::uniform_int_distribution<size_t> range_tombstone_count_dist = std::uniform_int_distribution<size_t>(4, 16));
|
||||
|
||||
future<std::vector<mutation>> generate_random_mutations(
|
||||
tests::random_schema& random_schema,
|
||||
timestamp_generator ts_gen = default_timestamp_generator(),
|
||||
|
||||
@@ -1,134 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2018-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
/// Helper functions for creating and populating a test table.
|
||||
///
|
||||
/// See the various `create_test_table()` overloads for more details.
|
||||
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
|
||||
namespace test {
|
||||
|
||||
class partition_content_generator {
|
||||
public:
|
||||
struct row {
|
||||
clustering_key key;
|
||||
bytes val;
|
||||
};
|
||||
|
||||
virtual ~partition_content_generator() = default;
|
||||
|
||||
// Allowed to generate duplicates.
|
||||
virtual partition_key generate_partition_key(const schema& schema) = 0;
|
||||
virtual bool has_static_row() = 0;
|
||||
virtual bytes generate_static_row(const schema& schema, const partition_key& pk) = 0;
|
||||
virtual int clustering_row_count() = 0;
|
||||
// Allowed to generate duplicates.
|
||||
virtual row generate_row(const schema& schema, const partition_key& pk) = 0;
|
||||
// Ranges can overlap. `rows` is the list of ck values the partition has, sorted.
|
||||
// Open bounds are not allowed.
|
||||
virtual query::clustering_row_ranges generate_delete_ranges(const schema& schema, const std::vector<clustering_key>& rows) = 0;
|
||||
};
|
||||
|
||||
class population_generator {
|
||||
public:
|
||||
virtual ~population_generator() = default;
|
||||
virtual size_t partition_count() = 0;
|
||||
virtual std::unique_ptr<partition_content_generator> make_partition_content_generator() = 0;
|
||||
};
|
||||
|
||||
struct partition_description {
|
||||
const dht::decorated_key dkey;
|
||||
bool has_static_row = false;
|
||||
// List of clustering keys, sorted.
|
||||
std::vector<clustering_key> live_rows;
|
||||
std::vector<clustering_key> dead_rows;
|
||||
// List of deleted clustering ranges, may overlap, sorted.
|
||||
query::clustering_row_ranges range_tombstones;
|
||||
|
||||
explicit partition_description(dht::decorated_key dkey)
|
||||
: dkey(std::move(dkey)) {
|
||||
}
|
||||
};
|
||||
|
||||
struct population_description {
|
||||
schema_ptr schema;
|
||||
// Sorted by ring order.
|
||||
// Exact number of generated partitions may differ from that returned by
|
||||
// `population_generator::partition_count()` as the generator is allowed
|
||||
// to generate duplicate partitions.
|
||||
std::vector<partition_description> partitions;
|
||||
};
|
||||
|
||||
struct partition_configuration {
|
||||
std::optional<std::uniform_int_distribution<int>> static_row_size_dist;
|
||||
std::uniform_int_distribution<int> clustering_row_count_dist;
|
||||
std::uniform_int_distribution<int> clustering_row_size_dist;
|
||||
std::uniform_int_distribution<int> range_deletion_count_dist;
|
||||
std::uniform_int_distribution<int> range_deletion_size_dist; // how many keys a range should include
|
||||
int count;
|
||||
};
|
||||
|
||||
using generate_blob_function = noncopyable_function<bytes(const schema& schema, size_t size, const partition_key& pk,
|
||||
const clustering_key* const ck)>;
|
||||
|
||||
/// Return those keys that overlap with at least one range.
|
||||
///
|
||||
/// \param keys sorted list of ck values.
|
||||
/// \param ranges sorted and de-duplicated list of ranges.
|
||||
std::vector<clustering_key> slice_keys(const schema& schema, const std::vector<clustering_key>& keys, const query::clustering_row_ranges& ranges);
|
||||
|
||||
/// Create and populate a test table (beginner version).
|
||||
///
|
||||
/// The keyspace and table are created as:
|
||||
///
|
||||
/// CREATE KEYSPACE
|
||||
/// ${ks_name}
|
||||
/// WITH
|
||||
/// REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}
|
||||
///
|
||||
/// CREATE TABLE
|
||||
/// ${ks_name}.${table_name}
|
||||
/// (pk int, ck int, s blob static, v blob, PRIMARY KEY(pk, ck))
|
||||
///
|
||||
/// The table is populated with partitions 0..partition_count and each partition
|
||||
/// is populated with clustering rows 0..row_per_partition_count.
|
||||
/// No static row (column) is written.
|
||||
/// For each row `v` contains: `pk ^ ck`.
|
||||
///
|
||||
/// \returns the schema and a vector of the written partition keys (decorated).
|
||||
/// The vector is sorted by ring order.
|
||||
std::pair<schema_ptr, std::vector<dht::decorated_key>> create_test_table(cql_test_env& env, const sstring& ks_name, const sstring& table_name,
|
||||
int partition_count = 10 * smp::count, int row_per_partition_count = 10);
|
||||
|
||||
/// Create and populate a test table (advanced version).
|
||||
///
|
||||
/// Uses the same schema as the "beginner version".
|
||||
/// Populates the table according to the passed in
|
||||
/// `population_distribution configuration`.
|
||||
/// Values for the column `v` and `s` are generated with the passed in
|
||||
/// `gen_blob` function.
|
||||
/// Allows for generating non-trivial random population in a controlled way.
|
||||
/// The partition configurations will be processed in a random order, using a
|
||||
/// deterministic pseudo-random engine. Passing the same seed will yield the
|
||||
/// same population.
|
||||
///
|
||||
/// \returns the description of the generated population.
|
||||
population_description create_test_table(cql_test_env& env, const sstring& ks_name, const sstring& table_name, uint32_t seed,
|
||||
std::vector<partition_configuration> part_configs, generate_blob_function gen_blob);
|
||||
|
||||
/// Create and populate a test table (expert version).
|
||||
///
|
||||
/// Uses the same schema as the "beginner version".
|
||||
/// Allows for a fully customized population of the test table.
|
||||
///
|
||||
/// \returns the description of the generated population.
|
||||
population_description create_test_table(cql_test_env& env, const sstring& ks_name, const sstring& table_name,
|
||||
std::unique_ptr<population_generator> pop_gen);
|
||||
|
||||
} // namespace test
|
||||
Reference in New Issue
Block a user