system_kayspace: make CDC_GENERATIONS_V3 single-partition

We make CDC_GENERATIONS_V3 single-partition by adding the key
column and changing the clustering key from range_end to
(id, range_end). This is the first step to enabling the efficient
clearing of obsolete CDC generation data, which we need to prevent
Raft-topology snapshots from endlessly growing as we introduce new
generations over time. The next step is to change the type of the id
column to timeuuid. We do it in the following commits.

After making CDC_GENERATIONS_V3 single-partition, there is no easy
way of preserving the num_ranges column. As it is used only for
sanity checking, we remove it to simplify the implementation.
This commit is contained in:
Patryk Jędrzejczak
2023-09-06 09:29:59 +02:00
parent 29f54836d0
commit 2cd430ac80
6 changed files with 45 additions and 34 deletions

View File

@@ -347,6 +347,20 @@ future<utils::chunked_vector<mutation>> get_cdc_generation_mutations_v2(
co_return res;
}
future<utils::chunked_vector<mutation>> get_cdc_generation_mutations_v3(
schema_ptr s,
utils::UUID id,
const cdc::topology_description& desc,
size_t mutation_size_threshold,
api::timestamp_type ts) {
auto pkey = partition_key::from_singular(*s, CDC_GENERATIONS_V3_KEY);
auto get_ckey = [&] (dht::token range_end) {
return clustering_key::from_exploded(*s, {uuid_type->decompose(id), long_type->decompose(dht::token::to_int64(range_end))}) ;
};
co_return co_await get_common_cdc_generation_mutations(s, pkey, std::move(get_ckey), desc, mutation_size_threshold, ts);
}
// non-static for testing
size_t limit_of_streams_in_topology_description() {
// Each stream takes 16B and we don't want to exceed 4MB so we can have

View File

@@ -155,4 +155,16 @@ future<utils::chunked_vector<mutation>> get_cdc_generation_mutations_v2(
schema_ptr, utils::UUID gen_uuid, const cdc::topology_description&,
size_t mutation_size_threshold, api::timestamp_type mutation_timestamp);
// The partition key of all rows in the single-partition CDC_GENERATIONS_V3 schema (in system keyspace).
static constexpr auto CDC_GENERATIONS_V3_KEY = "cdc_generations";
// Translates the CDC generation data given by a `cdc::topology_description` into a vector of mutations,
// using `mutation_size_threshold` to decide on the mutation sizes. The first clustering key column is
// given by `gen_uuid`. The timestamp of each cell in each mutation is given by `mutation_timestamp`.
//
// Works only for the CDC_GENERATIONS_V3 schema (in system keyspace).
future<utils::chunked_vector<mutation>> get_cdc_generation_mutations_v3(
schema_ptr, utils::UUID gen_uuid, const cdc::topology_description&,
size_t mutation_size_threshold, api::timestamp_type mutation_timestamp);
} // namespace cdc

View File

@@ -257,15 +257,18 @@ schema_ptr system_keyspace::cdc_generations_v3() {
thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, CDC_GENERATIONS_V3);
return schema_builder(NAME, CDC_GENERATIONS_V3, {id})
/* This is a single-partition table with key 'cdc_generations'. */
.with_column("key", utf8_type, column_kind::partition_key)
/* The unique identifier of this generation. */
.with_column("id", uuid_type, column_kind::partition_key)
.with_column("id", uuid_type, column_kind::clustering_key)
/* The generation describes a mapping from all tokens in the token ring to a set of stream IDs.
* This mapping is built from a bunch of smaller mappings, each describing how tokens in a
* subrange of the token ring are mapped to stream IDs; these subranges together cover the entire
* token ring. Each such range-local mapping is represented by a row of this table. The
* clustering key of the row is the end of the range being described by this row. The start of
* this range is the range_end of the previous row (in the clustering order, which is the integer
* order) or of the last row of this partition if this is the first the first row. */
* token ring. Each such range-local mapping is represented by a row of this table. The second
* column of the clustering key of the row is the end of the range being described by this row.
* The start of this range is the range_end of the previous row (in the clustering order, which
* is the integer order) or of the last row with the same id value if this is the first row with
* such id. */
.with_column("range_end", long_type, column_kind::clustering_key)
/* The set of streams mapped to in this range. The number of streams mapped to a single range in
* a CDC generation is bounded from above by the number of shards on the owner of that range in
@@ -278,10 +281,6 @@ schema_ptr system_keyspace::cdc_generations_v3() {
* range when the generation was first created. Together with the set of streams above it fully
* describes the mapping for this particular range. */
.with_column("ignore_msb", byte_type)
/* Column used for sanity checking. For a given generation it's equal to the number of ranges in
* this generation; thus, after the generation is fully inserted, it must be equal to the number
* of rows in the partition. */
.with_column("num_ranges", int32_type, column_kind::static_column)
.with_version(system_keyspace::generate_schema_version(id))
.build();
}();
@@ -2636,22 +2635,16 @@ future<service::topology> system_keyspace::load_topology_state() {
// Sanity check for CDC generation data consistency.
{
auto gen_rows = co_await execute_cql(
format("SELECT count(range_end) as cnt, num_ranges FROM system.{} WHERE id = ?",
CDC_GENERATIONS_V3),
format("SELECT count(range_end) as cnt FROM {}.{} WHERE key = '{}' AND id = ?",
NAME, CDC_GENERATIONS_V3, cdc::CDC_GENERATIONS_V3_KEY),
gen_uuid);
assert(gen_rows);
if (gen_rows->empty()) {
on_internal_error(slogger, format(
"load_topology_state: current CDC generation UUID ({}) present, but data missing", gen_uuid));
}
auto& row = gen_rows->one();
auto counted_ranges = row.get_as<int64_t>("cnt");
auto num_ranges = row.get_as<int32_t>("num_ranges");
if (counted_ranges != num_ranges) {
on_internal_error(slogger, format(
"load_topology_state: inconsistency in CDC generation data (UUID {}):"
" counted {} ranges, should be {}", gen_uuid, counted_ranges, num_ranges));
}
auto cnt = gen_rows->one().get_as<int64_t>("cnt");
slogger.debug("load_topology_state: current CDC generation UUID ({}), loaded {} ranges", gen_uuid, cnt);
}
} else {
if (!ret.normal_nodes.empty()) {
@@ -2719,10 +2712,9 @@ future<> system_keyspace::update_topology_fence_version(int64_t value) {
future<cdc::topology_description>
system_keyspace::read_cdc_generation(utils::UUID id) {
std::vector<cdc::token_range_description> entries;
size_t num_ranges = 0;
co_await _qp.query_internal(
format("SELECT range_end, streams, ignore_msb, num_ranges FROM {}.{} WHERE id = ?",
NAME, CDC_GENERATIONS_V3),
format("SELECT range_end, streams, ignore_msb FROM {}.{} WHERE key = '{}' AND id = ?",
NAME, CDC_GENERATIONS_V3, cdc::CDC_GENERATIONS_V3_KEY),
db::consistency_level::ONE,
{ id },
1000, // for ~1KB rows, ~1MB page size
@@ -2733,7 +2725,6 @@ system_keyspace::read_cdc_generation(utils::UUID id) {
dht::token::from_int64(row.get_as<int64_t>("range_end")),
std::move(streams),
uint8_t(row.get_as<int8_t>("ignore_msb"))});
num_ranges = row.get_as<int32_t>("num_ranges");
return make_ready_future<stop_iteration>(stop_iteration::no);
});
@@ -2743,12 +2734,6 @@ system_keyspace::read_cdc_generation(utils::UUID id) {
"read_cdc_generation: data for CDC generation {} not present", id));
}
if (entries.size() != num_ranges) {
throw std::runtime_error(format(
"read_cdc_generation: wrong number of rows. The `num_ranges` column claimed {} rows,"
" but reading the partition returned {}.", num_ranges, entries.size()));
}
co_return cdc::topology_description{std::move(entries)};
}

View File

@@ -165,16 +165,16 @@ When a node requests the cluster to join, the topology coordinator chooses token
The generation data described by `cdc::topology_description` is then translated into mutations and committed to group 0 using Raft commands. When a node applies these commands (every node in the cluster eventually does that, being a member of group 0), it writes the data into a local table `system.cdc_generations_v3`. The table has the following schema:
```
CREATE TABLE system.cdc_generations_v3 (
key text,
id uuid,
range_end bigint,
ignore_msb tinyint,
num_ranges int static,
streams frozen<set<blob>>,
PRIMARY KEY (id, range_end)
PRIMARY KEY (key, id, range_end)
) ...
```
The table's partition key is the `id uuid` column. The UUID used to insert a new generation into this table is randomly generated by the coordinator.
The table is single-partition where `key` always equals "cdc_generations". The UUID used to insert a new generation into this table is randomly generated by the coordinator.
The committed commands also update the `system.topology` table, storing the UUID in the `new_cdc_generation_data_uuid` column in the row which describes the joining node. Thanks to this, if the coordinator manages to insert the data but then fails, the next coordinator can resume from where the previous coordinator left off - using `new_cdc_generation_data_uuid` to continue with the generation switch.

View File

@@ -1142,7 +1142,7 @@ class topology_coordinator {
const size_t max_command_size = _raft.max_command_size();
const size_t mutation_size_threshold = max_command_size / 2;
auto gen_mutations = co_await cdc::get_cdc_generation_mutations_v2(
auto gen_mutations = co_await cdc::get_cdc_generation_mutations_v3(
gen_table_schema, gen_uuid, gen_desc, mutation_size_threshold, guard.write_timestamp());
co_return std::pair{gen_uuid, std::move(gen_mutations)};

View File

@@ -133,7 +133,7 @@ struct topology {
// This is the UUID used to access the data of a new CDC generation introduced
// e.g. when a new node bootstraps, needed in `commit_cdc_generation` transition state.
// It's used as partition key in CDC_GENERATIONS_V3 table.
// It's used as the first column of the clustering key in CDC_GENERATIONS_V3 table.
std::optional<utils::UUID> new_cdc_generation_data_uuid;
// The IDs of the commited yet unpublished CDC generations sorted by timestamps.