cdc: generation: introduce get_common_cdc_generation_mutations

In the following commit, we implement the
get_cdc_generation_mutations_v3 function very similar to
get_cdc_generation_mutations_v2. The only differences in creating
mutations between CDC_GENERATIONS_V2 and CDC_GENERATIONS_V3 are:
- a need to set the num_ranges cell for CDC_GENERATIONS_V2,
- different partition keys,
- different clustering keys.

To avoid code duplication, we introduce
get_common_cdc_generation_mutations, which does most of the work
shared by both functions.
This commit is contained in:
Patryk Jędrzejczak
2023-09-06 16:01:56 +02:00
parent ed1c1369d9
commit 29f54836d0

View File

@@ -282,21 +282,21 @@ bool is_cdc_generation_optimal(const cdc::topology_description& gen, const locat
}
}
future<utils::chunked_vector<mutation>> get_cdc_generation_mutations_v2(
static future<utils::chunked_vector<mutation>> get_common_cdc_generation_mutations(
schema_ptr s,
utils::UUID id,
const partition_key& pkey,
noncopyable_function<clustering_key (dht::token)>&& get_ckey_from_range_end,
const cdc::topology_description& desc,
size_t mutation_size_threshold,
api::timestamp_type ts) {
utils::chunked_vector<mutation> res;
res.emplace_back(s, partition_key::from_singular(*s, id));
res.back().set_static_cell(to_bytes("num_ranges"), int32_t(desc.entries().size()), ts);
res.emplace_back(s, pkey);
size_t size_estimate = 0;
size_t total_size_estimate = 0;
for (auto& e : desc.entries()) {
if (size_estimate >= mutation_size_threshold) {
total_size_estimate += size_estimate;
res.emplace_back(s, partition_key::from_singular(*s, id));
res.emplace_back(s, pkey);
size_estimate = 0;
}
@@ -307,7 +307,7 @@ future<utils::chunked_vector<mutation>> get_cdc_generation_mutations_v2(
}
size_estimate += e.streams.size() * 20;
auto ckey = clustering_key::from_singular(*s, dht::token::to_int64(e.token_range_end));
auto ckey = get_ckey_from_range_end(e.token_range_end);
res.back().set_cell(ckey, to_bytes("streams"), make_set_value(db::cdc_streams_set_type, std::move(streams)), ts);
res.back().set_cell(ckey, to_bytes("ignore_msb"), int8_t(e.sharding_ignore_msb), ts);
@@ -331,6 +331,22 @@ future<utils::chunked_vector<mutation>> get_cdc_generation_mutations_v2(
co_return res;
}
future<utils::chunked_vector<mutation>> get_cdc_generation_mutations_v2(
schema_ptr s,
utils::UUID id,
const cdc::topology_description& desc,
size_t mutation_size_threshold,
api::timestamp_type ts) {
auto pkey = partition_key::from_singular(*s, id);
auto get_ckey = [s] (dht::token range_end) {
return clustering_key::from_singular(*s, dht::token::to_int64(range_end));
};
auto res = co_await get_common_cdc_generation_mutations(s, pkey, std::move(get_ckey), desc, mutation_size_threshold, ts);
res.back().set_static_cell(to_bytes("num_ranges"), int32_t(desc.entries().size()), ts);
co_return res;
}
// non-static for testing
size_t limit_of_streams_in_topology_description() {
// Each stream takes 16B and we don't want to exceed 4MB so we can have