diff --git a/cdc/generation.cc b/cdc/generation.cc index 625e62e7d7..7d6243b300 100644 --- a/cdc/generation.cc +++ b/cdc/generation.cc @@ -282,21 +282,21 @@ bool is_cdc_generation_optimal(const cdc::topology_description& gen, const locat } } -future> get_cdc_generation_mutations_v2( +static future> get_common_cdc_generation_mutations( schema_ptr s, - utils::UUID id, + const partition_key& pkey, + noncopyable_function&& get_ckey_from_range_end, const cdc::topology_description& desc, size_t mutation_size_threshold, api::timestamp_type ts) { utils::chunked_vector res; - res.emplace_back(s, partition_key::from_singular(*s, id)); - res.back().set_static_cell(to_bytes("num_ranges"), int32_t(desc.entries().size()), ts); + res.emplace_back(s, pkey); size_t size_estimate = 0; size_t total_size_estimate = 0; for (auto& e : desc.entries()) { if (size_estimate >= mutation_size_threshold) { total_size_estimate += size_estimate; - res.emplace_back(s, partition_key::from_singular(*s, id)); + res.emplace_back(s, pkey); size_estimate = 0; } @@ -307,7 +307,7 @@ future> get_cdc_generation_mutations_v2( } size_estimate += e.streams.size() * 20; - auto ckey = clustering_key::from_singular(*s, dht::token::to_int64(e.token_range_end)); + auto ckey = get_ckey_from_range_end(e.token_range_end); res.back().set_cell(ckey, to_bytes("streams"), make_set_value(db::cdc_streams_set_type, std::move(streams)), ts); res.back().set_cell(ckey, to_bytes("ignore_msb"), int8_t(e.sharding_ignore_msb), ts); @@ -331,6 +331,22 @@ future> get_cdc_generation_mutations_v2( co_return res; } +future> get_cdc_generation_mutations_v2( + schema_ptr s, + utils::UUID id, + const cdc::topology_description& desc, + size_t mutation_size_threshold, + api::timestamp_type ts) { + auto pkey = partition_key::from_singular(*s, id); + auto get_ckey = [s] (dht::token range_end) { + return clustering_key::from_singular(*s, dht::token::to_int64(range_end)); + }; + + auto res = co_await get_common_cdc_generation_mutations(s, pkey, std::move(get_ckey), desc, mutation_size_threshold, ts); + res.back().set_static_cell(to_bytes("num_ranges"), int32_t(desc.entries().size()), ts); + co_return res; +} + // non-static for testing size_t limit_of_streams_in_topology_description() { // Each stream takes 16B and we don't want to exceed 4MB so we can have